package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.common.collect.Sets;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.LinkedList;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FileChunk;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.combine.Combiner;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MergeManager;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.class */
public class TestMergeManager {
    private static final Logger LOG = LoggerFactory.getLogger(TestMergeManager.class);
    private static Configuration defaultConf = new TezConfiguration();
    private static FileSystem localFs;
    private static Path workDir;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager$InterruptingThread.class */
    public class InterruptingThread implements Runnable {
        MergeManager.OnDiskMerger mergeThread;

        public InterruptingThread(MergeManager.OnDiskMerger onDiskMerger) {
            this.mergeThread = onDiskMerger;
        }

        @Override // java.lang.Runnable
        public void run() {
            do {
            } while (this.mergeThread.tmpDir == null);
            this.mergeThread.interrupt();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager$SrcFileInfo.class */
    public class SrcFileInfo {
        private Path path;
        private TezIndexRecord[] indexedRecords;

        private SrcFileInfo() {
        }
    }

    @Before
    @After
    public void cleanup() throws IOException {
        localFs.delete(workDir, true);
    }

    @Test(timeout = 10000)
    public void testConfigs() throws IOException {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.8f);
        tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.5f);
        Assert.assertTrue(MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L) == 6871947776L);
        tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.5f);
        tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.5f);
        Assert.assertTrue(MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L) > 2147483647L);
        tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.4f);
        tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.9f);
        Assert.assertTrue(MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L) > 2147483647L);
        tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.1f);
        tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.1f);
        Assert.assertTrue(MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L) < 2147483647L);
        try {
            tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 2.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong buffer percent configuration exception");
        } catch (IllegalArgumentException e) {
        }
        try {
            tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", -2.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong buffer percent configuration exception");
        } catch (IllegalArgumentException e2) {
        }
        try {
            tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 1.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong post merge buffer percent configuration exception");
        } catch (IllegalArgumentException e3) {
        }
        try {
            tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", -1.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong post merge buffer percent configuration exception");
        } catch (IllegalArgumentException e4) {
        }
        try {
            tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 1.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong shuffle fetch buffer percent configuration exception");
        } catch (IllegalArgumentException e5) {
        }
        try {
            tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", -1.4f);
            MergeManager.getInitialMemoryRequirement(tezConfiguration, 8589934592L);
            Assert.fail("Should have thrown wrong shuffle fetch buffer percent configuration exception");
        } catch (IllegalArgumentException e6) {
        }
        tezConfiguration.setFloat("tez.runtime.shuffle.fetch.buffer.percent", 0.4f);
        tezConfiguration.setFloat("tez.runtime.task.input.post-merge.buffer.percent", 0.8f);
        LocalFileSystem local = FileSystem.getLocal(tezConfiguration);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString(), 8589934592L);
        ExceptionReporter exceptionReporter = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        Assert.assertTrue(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, (long) (((double) 8589934592L) * 0.8d), (CompressionCodec) null, false, -1).postMergeMemLimit > 2147483647L);
        Assert.assertTrue(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 209715200L, (CompressionCodec) null, false, -1).postMergeMemLimit == 209715200);
    }

    @Test(timeout = 10000)
    public void testReservationAccounting() throws IOException {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        MergeManager mergeManager = new MergeManager(tezConfiguration, FileSystem.getLocal(tezConfiguration), (LocalDirAllocator) null, createMockInputContext(UUID.randomUUID().toString()), (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, (ExceptionReporter) Mockito.mock(ExceptionReporter.class), 2000000L, (CompressionCodec) null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        MapOutput reserve = mergeManager.reserve((InputAttemptIdentifier) null, 1L, 1L, 0);
        Assert.assertEquals(1L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        reserve.abort();
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        mergeManager.closeInMemoryFile(mergeManager.reserve((InputAttemptIdentifier) null, 2L, 2L, 0));
        Assert.assertEquals(2L, mergeManager.getUsedMemory());
        Assert.assertEquals(2L, mergeManager.getCommitMemory());
        mergeManager.releaseCommittedMemory(2L);
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
    }

    @Test(timeout = 20000)
    public void testIntermediateMemoryMergeAccounting() throws Exception {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.runtime.compress", false);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        tezConfiguration.setBoolean("tez.runtime.shuffle.memory-to-memory.enable", true);
        tezConfiguration.setInt("tez.runtime.shuffle.memory-to-memory.segments", 2);
        Path path = new Path(workDir, "local");
        Path path2 = new Path(workDir, "srcData");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        MergeManager mergeManager = new MergeManager(tezConfiguration, FileSystem.getLocal(tezConfiguration), new LocalDirAllocator("tez.runtime.framework.local.dirs"), createMockInputContext(UUID.randomUUID().toString()), (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, (ExceptionReporter) Mockito.mock(ExceptionReporter.class), 2000000L, (CompressionCodec) null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        byte[] generateData = generateData(tezConfiguration, 10, null);
        byte[] generateData2 = generateData(tezConfiguration, 20, null);
        MapOutput reserve = mergeManager.reserve((InputAttemptIdentifier) null, generateData.length, generateData.length, 0);
        MapOutput reserve2 = mergeManager.reserve((InputAttemptIdentifier) null, generateData2.length, generateData2.length, 0);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve2.getType());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        Assert.assertEquals(generateData.length + generateData2.length, mergeManager.getUsedMemory());
        System.arraycopy(generateData, 0, reserve.getMemory(), 0, generateData.length);
        System.arraycopy(generateData2, 0, reserve2.getMemory(), 0, generateData2.length);
        reserve2.commit();
        Assert.assertEquals(generateData2.length, mergeManager.getCommitMemory());
        Assert.assertEquals(generateData.length + generateData2.length, mergeManager.getUsedMemory());
        reserve.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals(generateData.length + generateData2.length, mergeManager.getCommitMemory());
        Assert.assertEquals(generateData.length + generateData2.length, mergeManager.getUsedMemory());
    }

    @Test
    public void testDiskMergeWithCodec() throws Throwable {
        Configuration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        tezConfiguration.setInt("tez.runtime.io.sort.factor", 3);
        Path path = new Path(workDir, "local");
        localFs.mkdirs(path);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString());
        DummyCompressionCodec dummyCompressionCodec = new DummyCompressionCodec();
        dummyCompressionCodec.setConf(tezConfiguration);
        MergeManager mergeManager = new MergeManager(tezConfiguration, localFs, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, (ExceptionReporter) Mockito.mock(ExceptionReporter.class), 2000L, dummyCompressionCodec, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0);
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0);
        InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(2, 0);
        InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(3, 0);
        byte[] generateDataBySizeAndGetBytes = generateDataBySizeAndGetBytes(tezConfiguration, 500, inputAttemptIdentifier);
        byte[] generateDataBySizeAndGetBytes2 = generateDataBySizeAndGetBytes(tezConfiguration, 500, inputAttemptIdentifier2);
        byte[] generateDataBySizeAndGetBytes3 = generateDataBySizeAndGetBytes(tezConfiguration, 500, inputAttemptIdentifier3);
        byte[] generateDataBySizeAndGetBytes4 = generateDataBySizeAndGetBytes(tezConfiguration, 500, inputAttemptIdentifier3);
        MapOutput reserve = mergeManager.reserve(inputAttemptIdentifier, generateDataBySizeAndGetBytes.length, generateDataBySizeAndGetBytes.length, 0);
        MapOutput reserve2 = mergeManager.reserve(inputAttemptIdentifier2, generateDataBySizeAndGetBytes2.length, generateDataBySizeAndGetBytes2.length, 0);
        MapOutput reserve3 = mergeManager.reserve(inputAttemptIdentifier3, generateDataBySizeAndGetBytes3.length, generateDataBySizeAndGetBytes3.length, 0);
        MapOutput reserve4 = mergeManager.reserve(inputAttemptIdentifier4, generateDataBySizeAndGetBytes4.length, generateDataBySizeAndGetBytes4.length, 0);
        reserve.getDisk().write(generateDataBySizeAndGetBytes);
        reserve.getDisk().flush();
        reserve2.getDisk().write(generateDataBySizeAndGetBytes2);
        reserve2.getDisk().flush();
        reserve3.getDisk().write(generateDataBySizeAndGetBytes3);
        reserve3.getDisk().flush();
        reserve4.getDisk().write(generateDataBySizeAndGetBytes4);
        reserve4.getDisk().flush();
        reserve.commit();
        reserve2.commit();
        reserve3.commit();
        reserve4.commit();
        mergeManager.close(true);
        Assert.assertTrue(dummyCompressionCodec.createInputStreamCalled > 0);
    }

    @Test(timeout = 60000)
    public void testIntermediateMemoryMerge() throws Throwable {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.runtime.compress", false);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        tezConfiguration.setBoolean("tez.runtime.shuffle.memory-to-memory.enable", true);
        tezConfiguration.setInt("tez.runtime.shuffle.memory-to-memory.segments", 3);
        Path path = new Path(workDir, "local");
        Path path2 = new Path(workDir, "srcData");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        LocalFileSystem local = FileSystem.getLocal(tezConfiguration);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManager = new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1);
        mergeManager.configureAndStart();
        Assert.assertEquals(0L, mergeManager.getUsedMemory());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0);
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0);
        InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(2, 0);
        InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(3, 0);
        byte[] generateDataBySize = generateDataBySize(tezConfiguration, 10, inputAttemptIdentifier);
        byte[] generateDataBySize2 = generateDataBySize(tezConfiguration, 20, inputAttemptIdentifier2);
        byte[] generateDataBySize3 = generateDataBySize(tezConfiguration, 200, inputAttemptIdentifier3);
        byte[] generateDataBySize4 = generateDataBySize(tezConfiguration, 20000, inputAttemptIdentifier4);
        MapOutput reserve = mergeManager.reserve(inputAttemptIdentifier, generateDataBySize.length, generateDataBySize.length, 0);
        MapOutput reserve2 = mergeManager.reserve(inputAttemptIdentifier, generateDataBySize2.length, generateDataBySize2.length, 0);
        MapOutput reserve3 = mergeManager.reserve(inputAttemptIdentifier, generateDataBySize3.length, generateDataBySize3.length, 0);
        MapOutput reserve4 = mergeManager.reserve(inputAttemptIdentifier, generateDataBySize4.length, generateDataBySize4.length, 0);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve2.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve3.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve4.getType());
        Assert.assertEquals(0L, mergeManager.getCommitMemory());
        Assert.assertEquals(generateDataBySize.length + generateDataBySize2.length + generateDataBySize3.length + generateDataBySize4.length, mergeManager.getUsedMemory());
        System.arraycopy(generateDataBySize, 0, reserve.getMemory(), 0, generateDataBySize.length);
        System.arraycopy(generateDataBySize2, 0, reserve2.getMemory(), 0, generateDataBySize2.length);
        System.arraycopy(generateDataBySize3, 0, reserve3.getMemory(), 0, generateDataBySize3.length);
        System.arraycopy(generateDataBySize4, 0, reserve4.getMemory(), 0, generateDataBySize4.length);
        reserve.commit();
        reserve2.commit();
        reserve3.commit();
        reserve4.commit();
        mergeManager.waitForMemToMemMerge();
        Assert.assertEquals(1L, mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals(1L, mergeManager.inMemoryMapOutputs.size());
        mergeManager.close(true);
        MergeManager mergeManager2 = new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1);
        mergeManager2.configureAndStart();
        byte[] generateDataBySize5 = generateDataBySize(tezConfiguration, 10, inputAttemptIdentifier);
        byte[] generateDataBySize6 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier2);
        byte[] generateDataBySize7 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier3);
        byte[] generateDataBySize8 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier4);
        MapOutput reserve5 = mergeManager2.reserve(inputAttemptIdentifier, generateDataBySize5.length, generateDataBySize5.length, 0);
        MapOutput reserve6 = mergeManager2.reserve(inputAttemptIdentifier2, generateDataBySize6.length, generateDataBySize6.length, 0);
        MapOutput reserve7 = mergeManager2.reserve(inputAttemptIdentifier3, generateDataBySize7.length, generateDataBySize7.length, 0);
        MapOutput reserve8 = mergeManager2.reserve(inputAttemptIdentifier4, generateDataBySize8.length, generateDataBySize8.length, 0);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve5.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve6.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve7.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve8.getType());
        Assert.assertEquals(0L, mergeManager2.getCommitMemory());
        Assert.assertEquals(generateDataBySize5.length + generateDataBySize6.length + generateDataBySize7.length + generateDataBySize8.length, mergeManager2.getUsedMemory());
        System.arraycopy(generateDataBySize5, 0, reserve5.getMemory(), 0, generateDataBySize5.length);
        System.arraycopy(generateDataBySize6, 0, reserve6.getMemory(), 0, generateDataBySize6.length);
        System.arraycopy(generateDataBySize7, 0, reserve7.getMemory(), 0, generateDataBySize7.length);
        System.arraycopy(generateDataBySize8, 0, reserve8.getMemory(), 0, generateDataBySize8.length);
        reserve5.commit();
        reserve6.commit();
        reserve7.commit();
        reserve8.commit();
        mergeManager2.waitForMemToMemMerge();
        Assert.assertEquals(1L, mergeManager2.inMemoryMergedMapOutputs.size());
        Assert.assertEquals(2L, mergeManager2.inMemoryMapOutputs.size());
        mergeManager2.close(true);
        MergeManager mergeManager3 = new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1);
        mergeManager3.configureAndStart();
        byte[] generateDataBySize9 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier);
        byte[] generateDataBySize10 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier2);
        byte[] generateDataBySize11 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier3);
        byte[] generateDataBySize12 = generateDataBySize(tezConfiguration, 400000, inputAttemptIdentifier4);
        MapOutput reserve9 = mergeManager3.reserve(inputAttemptIdentifier, generateDataBySize9.length, generateDataBySize9.length, 0);
        MapOutput reserve10 = mergeManager3.reserve(inputAttemptIdentifier2, generateDataBySize10.length, generateDataBySize10.length, 0);
        MapOutput reserve11 = mergeManager3.reserve(inputAttemptIdentifier3, generateDataBySize11.length, generateDataBySize11.length, 0);
        MapOutput reserve12 = mergeManager3.reserve(inputAttemptIdentifier4, generateDataBySize12.length, generateDataBySize12.length, 0);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve9.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve10.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve11.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve12.getType());
        Assert.assertEquals(0L, mergeManager3.getCommitMemory());
        Assert.assertEquals(generateDataBySize9.length + generateDataBySize10.length + generateDataBySize11.length + generateDataBySize12.length, mergeManager3.getUsedMemory());
        System.arraycopy(generateDataBySize9, 0, reserve9.getMemory(), 0, generateDataBySize9.length);
        System.arraycopy(generateDataBySize10, 0, reserve10.getMemory(), 0, generateDataBySize10.length);
        System.arraycopy(generateDataBySize11, 0, reserve11.getMemory(), 0, generateDataBySize11.length);
        System.arraycopy(generateDataBySize12, 0, reserve12.getMemory(), 0, generateDataBySize12.length);
        reserve9.commit();
        reserve10.commit();
        reserve11.commit();
        reserve12.commit();
        mergeManager3.waitForMemToMemMerge();
        Assert.assertEquals(0L, mergeManager3.inMemoryMergedMapOutputs.size());
        Assert.assertEquals(4L, mergeManager3.inMemoryMapOutputs.size());
        mergeManager3.close(true);
        tezConfiguration.setInt("tez.runtime.shuffle.memory-to-memory.segments", 4);
        MergeManager mergeManager4 = new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1);
        mergeManager4.configureAndStart();
        byte[] generateDataBySize13 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier);
        byte[] generateDataBySize14 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier2);
        byte[] generateDataBySize15 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier3);
        byte[] generateDataBySize16 = generateDataBySize(tezConfiguration, 230000, inputAttemptIdentifier4);
        MapOutput reserve13 = mergeManager4.reserve(inputAttemptIdentifier, generateDataBySize13.length, generateDataBySize13.length, 0);
        MapOutput reserve14 = mergeManager4.reserve(inputAttemptIdentifier2, generateDataBySize14.length, generateDataBySize14.length, 0);
        MapOutput reserve15 = mergeManager4.reserve(inputAttemptIdentifier3, generateDataBySize15.length, generateDataBySize15.length, 0);
        MapOutput reserve16 = mergeManager4.reserve(inputAttemptIdentifier4, generateDataBySize16.length, generateDataBySize16.length, 0);
        Assert.assertTrue(mergeManager4.getUsedMemory() >= 1493000);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve13.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve14.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve15.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve16.getType());
        Assert.assertEquals(0L, mergeManager4.getCommitMemory());
        Assert.assertEquals(generateDataBySize13.length + generateDataBySize14.length + generateDataBySize15.length + generateDataBySize16.length, mergeManager4.getUsedMemory());
        System.arraycopy(generateDataBySize13, 0, reserve13.getMemory(), 0, generateDataBySize13.length);
        System.arraycopy(generateDataBySize14, 0, reserve14.getMemory(), 0, generateDataBySize14.length);
        System.arraycopy(generateDataBySize15, 0, reserve15.getMemory(), 0, generateDataBySize15.length);
        System.arraycopy(generateDataBySize16, 0, reserve16.getMemory(), 0, generateDataBySize16.length);
        reserve13.commit();
        reserve14.commit();
        reserve15.commit();
        reserve16.commit();
        mergeManager4.waitForMemToMemMerge();
        Assert.assertEquals(4, mergeManager4.inMemoryMapOutputs.size());
        mergeManager4.close(true);
        tezConfiguration.setInt("tez.runtime.shuffle.memory-to-memory.segments", 4);
        MergeManager mergeManager5 = new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter, 2000000L, (CompressionCodec) null, false, -1);
        mergeManager5.configureAndStart();
        byte[] generateDataBySize17 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier);
        byte[] generateDataBySize18 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier2);
        byte[] generateDataBySize19 = generateDataBySize(tezConfiguration, 490000, inputAttemptIdentifier3);
        byte[] generateDataBySize20 = generateDataBySize(tezConfiguration, 230000, inputAttemptIdentifier4);
        MapOutput reserve17 = mergeManager5.reserve(inputAttemptIdentifier, generateDataBySize17.length, generateDataBySize17.length, 0);
        MapOutput reserve18 = mergeManager5.reserve(inputAttemptIdentifier2, generateDataBySize18.length, generateDataBySize18.length, 0);
        MapOutput reserve19 = mergeManager5.reserve(inputAttemptIdentifier3, generateDataBySize19.length, generateDataBySize19.length, 0);
        MapOutput reserve20 = mergeManager5.reserve(inputAttemptIdentifier4, generateDataBySize20.length, generateDataBySize20.length, 0);
        Assert.assertTrue(mergeManager5.getUsedMemory() >= 1493000);
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve17.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve18.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve19.getType());
        Assert.assertEquals(MapOutput.Type.MEMORY, reserve20.getType());
        Assert.assertEquals(0L, mergeManager5.getCommitMemory());
        Assert.assertEquals(generateDataBySize17.length + generateDataBySize18.length + generateDataBySize19.length + generateDataBySize20.length, mergeManager5.getUsedMemory());
        System.arraycopy(generateDataBySize17, 0, reserve17.getMemory(), 0, generateDataBySize17.length);
        System.arraycopy(generateDataBySize18, 0, reserve18.getMemory(), 0, generateDataBySize18.length);
        System.arraycopy(generateDataBySize19, 0, reserve19.getMemory(), 0, generateDataBySize19.length);
        System.arraycopy(generateDataBySize20, 0, reserve20.getMemory(), 0, generateDataBySize20.length);
        reserve17.commit();
        reserve18.commit();
        reserve19.commit();
        reserve20.commit();
        mergeManager5.waitForMemToMemMerge();
        Assert.assertEquals(4, mergeManager5.inMemoryMapOutputs.size());
        Assert.assertNull(mergeManager5.close(false));
        Assert.assertFalse(mergeManager5.isMergeComplete());
    }

    private byte[] generateDataBySize(Configuration configuration, int i, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), new FSDataOutputStream(byteArrayOutputStream, (FileSystem.Statistics) null), IntWritable.class, IntWritable.class, (CompressionCodec) null, (TezCounter) null, (TezCounter) null);
        int i2 = 0;
        do {
            writer.append(new IntWritable(i2), new IntWritable(i2));
            i2++;
        } while (writer.getRawLength() <= i);
        writer.close();
        int compressedLength = (int) writer.getCompressedLength();
        int rawLength = (int) writer.getRawLength();
        byte[] bArr = new byte[rawLength];
        ShuffleUtils.shuffleToMemory(bArr, new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), rawLength, compressedLength, (CompressionCodec) null, false, 0, LOG, inputAttemptIdentifier);
        return bArr;
    }

    private byte[] generateDataBySizeAndGetBytes(Configuration configuration, int i, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), new FSDataOutputStream(byteArrayOutputStream, (FileSystem.Statistics) null), IntWritable.class, IntWritable.class, (CompressionCodec) null, (TezCounter) null, (TezCounter) null);
        int i2 = 0;
        do {
            writer.append(new IntWritable(i2), new IntWritable(i2));
            i2++;
        } while (writer.getRawLength() <= i);
        writer.close();
        int compressedLength = (int) writer.getCompressedLength();
        int rawLength = (int) writer.getRawLength();
        ShuffleUtils.shuffleToMemory(new byte[rawLength], new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), rawLength, compressedLength, (CompressionCodec) null, false, 0, LOG, inputAttemptIdentifier);
        return byteArrayOutputStream.toByteArray();
    }

    private byte[] generateData(Configuration configuration, int i, InputAttemptIdentifier inputAttemptIdentifier) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), new FSDataOutputStream(byteArrayOutputStream, (FileSystem.Statistics) null), IntWritable.class, IntWritable.class, (CompressionCodec) null, (TezCounter) null, (TezCounter) null);
        for (int i2 = 0; i2 < i; i2++) {
            writer.append(new IntWritable(i2), new IntWritable(i2));
        }
        writer.close();
        int compressedLength = (int) writer.getCompressedLength();
        int rawLength = (int) writer.getRawLength();
        byte[] bArr = new byte[rawLength];
        ShuffleUtils.shuffleToMemory(bArr, new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), rawLength, compressedLength, (CompressionCodec) null, false, 0, LOG, inputAttemptIdentifier);
        return bArr;
    }

    @Test(timeout = 10000)
    public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException {
        testLocalDiskMergeMultipleTasks(false);
        testLocalDiskMergeMultipleTasks(true);
    }

    @Test(timeout = 10000)
    public void testOnDiskMergerFilenames() throws IOException, InterruptedException {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.runtime.compress", false);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        Path path = new Path(workDir, "local");
        Path path2 = new Path(workDir, "srcData");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        LocalFileSystem local = FileSystem.getLocal(tezConfiguration);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString());
        MergeManager mergeManager = (MergeManager) Mockito.spy(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, (ExceptionReporter) Mockito.mock(ExceptionReporter.class), 1048576L, (CompressionCodec) null, false, -1));
        SrcFileInfo createFile = createFile(tezConfiguration, local, new Path(path2, "attemptsrc1.out"), 2, 3, 6);
        SrcFileInfo createFile2 = createFile(tezConfiguration, local, new Path(path2, "attemptsrc2.out"), 2, 3, 0);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, createFile.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0, createFile2.path.getName());
        MapOutput mapOutputForDirectDiskFetch = getMapOutputForDirectDiskFetch(inputAttemptIdentifier, createFile.path, createFile.indexedRecords[0], mergeManager);
        MapOutput mapOutputForDirectDiskFetch2 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier2, createFile2.path, createFile2.indexedRecords[0], mergeManager);
        mapOutputForDirectDiskFetch.commit();
        mapOutputForDirectDiskFetch2.commit();
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch.getOutputPath());
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch2.getOutputPath());
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(linkedList);
        Assert.assertEquals(1L, mergeManager.onDiskMapOutputs.size());
        Path path3 = ((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath();
        Assert.assertTrue(path3.toString().endsWith("merged0"));
        SrcFileInfo createFile3 = createFile(tezConfiguration, local, new Path(path2, "attemptsrc3.out"), 2, 22, 5);
        MapOutput mapOutputForDirectDiskFetch3 = getMapOutputForDirectDiskFetch(new InputAttemptIdentifier(2, 0, createFile.path.getName()), createFile3.path, createFile3.indexedRecords[0], mergeManager);
        mapOutputForDirectDiskFetch3.commit();
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch3.getOutputPath());
        LinkedList linkedList2 = new LinkedList();
        linkedList2.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(linkedList2);
        Assert.assertEquals(1L, mergeManager.onDiskMapOutputs.size());
        Path path4 = ((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath();
        Assert.assertTrue(path4.toString().endsWith("merged1"));
        Assert.assertNotEquals(path3, path4);
        SrcFileInfo createFile4 = createFile(tezConfiguration, local, new Path(path2, "attemptsrc4.out"), 2, 45, 35);
        MapOutput mapOutputForDirectDiskFetch4 = getMapOutputForDirectDiskFetch(new InputAttemptIdentifier(3, 0, createFile4.path.getName()), createFile4.path, createFile4.indexedRecords[0], mergeManager);
        mapOutputForDirectDiskFetch4.commit();
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch4.getOutputPath());
        LinkedList linkedList3 = new LinkedList();
        LinkedList linkedList4 = new LinkedList();
        Assert.assertEquals(2L, mergeManager.onDiskMapOutputs.size());
        linkedList3.addAll(mergeManager.onDiskMapOutputs);
        linkedList4.add(linkedList3.get(1));
        linkedList4.add(linkedList3.get(0));
        mergeManager.onDiskMapOutputs.clear();
        mergeManager.onDiskMerger.merge(linkedList4);
        Assert.assertEquals(1L, mergeManager.onDiskMapOutputs.size());
        Path path5 = ((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath();
        Assert.assertTrue(path5.toString().endsWith("merged2"));
        Assert.assertNotEquals(path4, path5);
        Assert.assertEquals(path3.toString().length(), path4.toString().length());
        Assert.assertEquals(path4.toString().length(), path5.toString().length());
        String substring = path3.toString().substring(0, path3.toString().lastIndexOf(46));
        String substring2 = path4.toString().substring(0, path4.toString().lastIndexOf(46));
        String substring3 = path5.toString().substring(0, path5.toString().lastIndexOf(46));
        Assert.assertEquals(substring, substring2);
        Assert.assertNotEquals(substring, substring3);
        Assert.assertNotEquals(substring2, substring3);
        ((InputContext) Mockito.verify(createMockInputContext, Mockito.atLeastOnce())).notifyProgress();
    }

    void testLocalDiskMergeMultipleTasks(final boolean z) throws IOException, InterruptedException {
        TezConfiguration tezConfiguration = new TezConfiguration(defaultConf);
        tezConfiguration.setBoolean("tez.runtime.compress", false);
        tezConfiguration.set("tez.runtime.key.class", IntWritable.class.getName());
        tezConfiguration.set("tez.runtime.value.class", IntWritable.class.getName());
        Path path = new Path(workDir, "local");
        Path path2 = new Path(workDir, "srcData");
        localFs.mkdirs(path);
        localFs.mkdirs(path2);
        tezConfiguration.setStrings("tez.runtime.framework.local.dirs", new String[]{path.toString()});
        LocalFileSystem local = FileSystem.getLocal(tezConfiguration);
        LocalDirAllocator localDirAllocator = new LocalDirAllocator("tez.runtime.framework.local.dirs");
        InputContext createMockInputContext = createMockInputContext(UUID.randomUUID().toString());
        InputContext createMockInputContext2 = createMockInputContext(UUID.randomUUID().toString());
        ExceptionReporter exceptionReporter = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        ExceptionReporter exceptionReporter2 = (ExceptionReporter) Mockito.mock(ExceptionReporter.class);
        MergeManager mergeManager = (MergeManager) Mockito.spy(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext, null, null, null, null, exceptionReporter, 2000000L, null, false, -1) { // from class: org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestMergeManager.1
            public synchronized void closeOnDiskFile(FileChunk fileChunk) {
                if (z) {
                    try {
                        Thread.sleep(2000L);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                super.closeOnDiskFile(fileChunk);
            }
        });
        mergeManager.configureAndStart();
        MergeManager mergeManager2 = (MergeManager) Mockito.spy(new MergeManager(tezConfiguration, local, localDirAllocator, createMockInputContext2, (Combiner) null, (TezCounter) null, (TezCounter) null, (TezCounter) null, exceptionReporter2, 2000000L, (CompressionCodec) null, false, -1));
        SrcFileInfo createFile = createFile(tezConfiguration, local, new Path(path2, "attemptsrc1.out"), 2, 3, 0);
        SrcFileInfo createFile2 = createFile(tezConfiguration, local, new Path(path2, "attemptsrc2.out"), 2, 3, 6);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(0, 0, createFile.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier2 = new InputAttemptIdentifier(1, 0, createFile2.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier3 = new InputAttemptIdentifier(0, 0, createFile.path.getName());
        InputAttemptIdentifier inputAttemptIdentifier4 = new InputAttemptIdentifier(1, 0, createFile2.path.getName());
        MapOutput mapOutputForDirectDiskFetch = getMapOutputForDirectDiskFetch(inputAttemptIdentifier, createFile.path, createFile.indexedRecords[0], mergeManager);
        MapOutput mapOutputForDirectDiskFetch2 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier2, createFile2.path, createFile2.indexedRecords[0], mergeManager);
        MapOutput mapOutputForDirectDiskFetch3 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier3, createFile.path, createFile.indexedRecords[1], mergeManager2);
        MapOutput mapOutputForDirectDiskFetch4 = getMapOutputForDirectDiskFetch(inputAttemptIdentifier4, createFile2.path, createFile2.indexedRecords[1], mergeManager2);
        mapOutputForDirectDiskFetch.commit();
        mapOutputForDirectDiskFetch2.commit();
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch.getOutputPath());
        ((MergeManager) Mockito.verify(mergeManager)).closeOnDiskFile(mapOutputForDirectDiskFetch2.getOutputPath());
        LinkedList linkedList = new LinkedList();
        linkedList.addAll(mergeManager.onDiskMapOutputs);
        mergeManager.onDiskMapOutputs.clear();
        if (z) {
            new Thread(new InterruptingThread(mergeManager.onDiskMerger)).start();
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            mergeManager.onDiskMerger.startMerge(Sets.newHashSet(linkedList));
            mergeManager.onDiskMerger.waitForMerge();
            Assert.assertNotEquals(1L, mergeManager.onDiskMapOutputs.size());
        } else {
            mergeManager.onDiskMerger.merge(linkedList);
            Assert.assertEquals(1L, mergeManager.onDiskMapOutputs.size());
        }
        if (z) {
            return;
        }
        mapOutputForDirectDiskFetch3.commit();
        mapOutputForDirectDiskFetch4.commit();
        ((MergeManager) Mockito.verify(mergeManager2)).closeOnDiskFile(mapOutputForDirectDiskFetch3.getOutputPath());
        ((MergeManager) Mockito.verify(mergeManager2)).closeOnDiskFile(mapOutputForDirectDiskFetch4.getOutputPath());
        LinkedList linkedList2 = new LinkedList();
        linkedList2.addAll(mergeManager2.onDiskMapOutputs);
        mergeManager2.onDiskMapOutputs.clear();
        mergeManager2.onDiskMerger.merge(linkedList2);
        Assert.assertEquals(1L, mergeManager2.onDiskMapOutputs.size());
        Assert.assertNotEquals(((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath(), ((FileChunk) mergeManager2.onDiskMapOutputs.iterator().next()).getPath());
        Assert.assertTrue(((FileChunk) mergeManager.onDiskMapOutputs.iterator().next()).getPath().toString().contains(createMockInputContext.getUniqueIdentifier()));
        Assert.assertTrue(((FileChunk) mergeManager2.onDiskMapOutputs.iterator().next()).getPath().toString().contains(createMockInputContext2.getUniqueIdentifier()));
    }

    private InputContext createMockInputContext(String str) {
        return createMockInputContext(str, 209715200L);
    }

    private InputContext createMockInputContext(String str, long j) {
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(new TezCounters()).when(inputContext)).getCounters();
        ((InputContext) Mockito.doReturn(Long.valueOf(j)).when(inputContext)).getTotalMemoryAvailableToTask();
        ((InputContext) Mockito.doReturn("srcVertexName").when(inputContext)).getSourceVertexName();
        ((InputContext) Mockito.doReturn(str).when(inputContext)).getUniqueIdentifier();
        return inputContext;
    }

    private SrcFileInfo createFile(Configuration configuration, FileSystem fileSystem, Path path, int i, int i2, int i3) throws IOException {
        FSDataOutputStream create = fileSystem.create(path);
        int i4 = i3;
        SrcFileInfo srcFileInfo = new SrcFileInfo();
        srcFileInfo.indexedRecords = new TezIndexRecord[i];
        srcFileInfo.path = path;
        for (int i5 = 0; i5 < i; i5++) {
            long pos = create.getPos();
            IFile.Writer writer = new IFile.Writer(new WritableSerialization(), new WritableSerialization(), create, IntWritable.class, IntWritable.class, (CompressionCodec) null, (TezCounter) null, (TezCounter) null);
            for (int i6 = 0; i6 < i2; i6++) {
                writer.append(new IntWritable(i4), new IntWritable(i4));
                i4++;
            }
            writer.close();
            srcFileInfo.indexedRecords[i5] = new TezIndexRecord(pos, writer.getRawLength(), writer.getCompressedLength());
        }
        create.close();
        return srcFileInfo;
    }

    private static MapOutput getMapOutputForDirectDiskFetch(InputAttemptIdentifier inputAttemptIdentifier, Path path, TezIndexRecord tezIndexRecord, MergeManager mergeManager) throws IOException {
        return MapOutput.createLocalDiskMapOutput(inputAttemptIdentifier, mergeManager, path, tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), true);
    }

    static {
        localFs = null;
        workDir = null;
        try {
            defaultConf.set("fs.defaultFS", "file:///");
            localFs = FileSystem.getLocal(defaultConf);
            workDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestMergeManager.class.getSimpleName());
            workDir = localFs.makeQualified(workDir);
            localFs.mkdirs(workDir);
            LOG.info("Using workDir: " + workDir);
        } catch (IOException e) {
            throw new RuntimeException("init failure", e);
        }
    }
}
