/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFile;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MROutputFiles;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Merger;
import org.apache.hadoop.mapred.RawKeyValueIterator;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.MergeThread;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

public class TestMerger {
    private static File testRootDir;
    @Rule
    public TestName unitTestName = new TestName();
    private File unitTestDir;
    private JobConf jobConf;
    private FileSystem fs;

    @BeforeClass
    public static void setupClass() throws Exception {
        testRootDir = GenericTestUtils.setupTestRootDir(TestMerger.class);
    }

    @Before
    public void setup() throws IOException {
        this.unitTestDir = new File(testRootDir, this.unitTestName.getMethodName());
        this.unitTestDir.mkdirs();
        this.jobConf = new JobConf();
        MRJobConfUtil.setLocalDirectoriesConfigForTesting((Configuration)this.jobConf, (File)this.unitTestDir);
        this.jobConf.set("mapreduce.framework.name", "local");
        this.fs = FileSystem.getLocal((Configuration)this.jobConf);
    }

    @Test
    public void testEncryptedMerger() throws Throwable {
        MRJobConfUtil.initEncryptedIntermediateConfigsForTesting((Configuration)this.jobConf);
        Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
        TokenCache.setEncryptedSpillKey((byte[])new byte[16], (Credentials)credentials);
        UserGroupInformation.getCurrentUser().addCredentials(credentials);
        this.testInMemoryAndOnDiskMerger();
    }

    @Test
    public void testInMemoryAndOnDiskMerger() throws Throwable {
        JobID jobId = new JobID("a", 0);
        TaskAttemptID reduceId1 = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 0), 0);
        TaskAttemptID mapId1 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 1), 0);
        TaskAttemptID mapId2 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 2), 0);
        LocalDirAllocator lda = new LocalDirAllocator("mapreduce.cluster.local.dir");
        MergeManagerImpl mergeManager = new MergeManagerImpl(reduceId1, this.jobConf, this.fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), (MapOutputFile)new MROutputFiles());
        TreeMap<String, String> map1 = new TreeMap<String, String>();
        map1.put("apple", "disgusting");
        map1.put("carrot", "delicious");
        TreeMap<String, String> map2 = new TreeMap<String, String>();
        map1.put("banana", "pretty good");
        byte[] mapOutputBytes1 = this.writeMapOutput((Configuration)this.jobConf, map1);
        byte[] mapOutputBytes2 = this.writeMapOutput((Configuration)this.jobConf, map2);
        InMemoryMapOutput mapOutput1 = new InMemoryMapOutput((Configuration)this.jobConf, mapId1, mergeManager, mapOutputBytes1.length, null, true);
        InMemoryMapOutput mapOutput2 = new InMemoryMapOutput((Configuration)this.jobConf, mapId2, mergeManager, mapOutputBytes2.length, null, true);
        System.arraycopy(mapOutputBytes1, 0, mapOutput1.getMemory(), 0, mapOutputBytes1.length);
        System.arraycopy(mapOutputBytes2, 0, mapOutput2.getMemory(), 0, mapOutputBytes2.length);
        MergeThread inMemoryMerger = mergeManager.createInMemoryMerger();
        ArrayList<InMemoryMapOutput> mapOutputs1 = new ArrayList<InMemoryMapOutput>();
        mapOutputs1.add(mapOutput1);
        mapOutputs1.add(mapOutput2);
        inMemoryMerger.merge(mapOutputs1);
        Assert.assertEquals((long)1L, (long)mergeManager.onDiskMapOutputs.size());
        TaskAttemptID reduceId2 = new TaskAttemptID(new TaskID(jobId, TaskType.REDUCE, 3), 0);
        TaskAttemptID mapId3 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 4), 0);
        TaskAttemptID mapId4 = new TaskAttemptID(new TaskID(jobId, TaskType.MAP, 5), 0);
        TreeMap<String, String> map3 = new TreeMap<String, String>();
        map3.put("apple", "awesome");
        map3.put("carrot", "amazing");
        TreeMap<String, String> map4 = new TreeMap<String, String>();
        map4.put("banana", "bla");
        byte[] mapOutputBytes3 = this.writeMapOutput((Configuration)this.jobConf, map3);
        byte[] mapOutputBytes4 = this.writeMapOutput((Configuration)this.jobConf, map4);
        InMemoryMapOutput mapOutput3 = new InMemoryMapOutput((Configuration)this.jobConf, mapId3, mergeManager, mapOutputBytes3.length, null, true);
        InMemoryMapOutput mapOutput4 = new InMemoryMapOutput((Configuration)this.jobConf, mapId4, mergeManager, mapOutputBytes4.length, null, true);
        System.arraycopy(mapOutputBytes3, 0, mapOutput3.getMemory(), 0, mapOutputBytes3.length);
        System.arraycopy(mapOutputBytes4, 0, mapOutput4.getMemory(), 0, mapOutputBytes4.length);
        MergeThread inMemoryMerger2 = mergeManager.createInMemoryMerger();
        ArrayList<InMemoryMapOutput> mapOutputs2 = new ArrayList<InMemoryMapOutput>();
        mapOutputs2.add(mapOutput3);
        mapOutputs2.add(mapOutput4);
        inMemoryMerger2.merge(mapOutputs2);
        Assert.assertEquals((long)2L, (long)mergeManager.onDiskMapOutputs.size());
        ArrayList<MergeManagerImpl.CompressAwarePath> paths = new ArrayList<MergeManagerImpl.CompressAwarePath>();
        Iterator iterator = mergeManager.onDiskMapOutputs.iterator();
        ArrayList<String> keys = new ArrayList<String>();
        ArrayList<String> values = new ArrayList<String>();
        while (iterator.hasNext()) {
            MergeManagerImpl.CompressAwarePath next = (MergeManagerImpl.CompressAwarePath)iterator.next();
            this.readOnDiskMapOutput((Configuration)this.jobConf, this.fs, (Path)next, keys, values);
            paths.add(next);
        }
        Assertions.assertThat(keys).isEqualTo(Arrays.asList("apple", "banana", "carrot", "apple", "banana", "carrot"));
        Assertions.assertThat(values).isEqualTo(Arrays.asList("awesome", "bla", "amazing", "disgusting", "pretty good", "delicious"));
        mergeManager.close();
        mergeManager = new MergeManagerImpl(reduceId2, this.jobConf, this.fs, lda, Reporter.NULL, null, null, null, null, null, null, null, new Progress(), (MapOutputFile)new MROutputFiles());
        MergeThread onDiskMerger = mergeManager.createOnDiskMerger();
        onDiskMerger.merge(paths);
        Assert.assertEquals((long)1L, (long)mergeManager.onDiskMapOutputs.size());
        keys = new ArrayList();
        values = new ArrayList();
        this.readOnDiskMapOutput((Configuration)this.jobConf, this.fs, (Path)mergeManager.onDiskMapOutputs.iterator().next(), keys, values);
        Assertions.assertThat(keys).isEqualTo(Arrays.asList("apple", "apple", "banana", "banana", "carrot", "carrot"));
        Assertions.assertThat(values).isEqualTo(Arrays.asList("awesome", "disgusting", "pretty good", "bla", "amazing", "delicious"));
        mergeManager.close();
        Assert.assertEquals((long)0L, (long)mergeManager.inMemoryMapOutputs.size());
        Assert.assertEquals((long)0L, (long)mergeManager.inMemoryMergedMapOutputs.size());
        Assert.assertEquals((long)0L, (long)mergeManager.onDiskMapOutputs.size());
    }

    private byte[] writeMapOutput(Configuration conf, Map<String, String> keysToValues) throws IOException {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        FSDataOutputStream fsdos = new FSDataOutputStream((OutputStream)baos, null);
        IFile.Writer writer = new IFile.Writer(conf, fsdos, Text.class, Text.class, null, null);
        for (String key : keysToValues.keySet()) {
            String value = keysToValues.get(key);
            writer.append((Object)new Text(key), (Object)new Text(value));
        }
        writer.close();
        return baos.toByteArray();
    }

    private void readOnDiskMapOutput(Configuration conf, FileSystem fs, Path path, List<String> keys, List<String> values) throws IOException {
        FSDataInputStream in = IntermediateEncryptedStream.wrapIfNecessary((Configuration)conf, (FSDataInputStream)fs.open(path), (Path)path);
        IFile.Reader reader = new IFile.Reader(conf, in, fs.getFileStatus(path).getLen(), null, null);
        DataInputBuffer keyBuff = new DataInputBuffer();
        DataInputBuffer valueBuff = new DataInputBuffer();
        Text key = new Text();
        Text value = new Text();
        while (reader.nextRawKey(keyBuff)) {
            key.readFields((DataInput)keyBuff);
            keys.add(key.toString());
            reader.nextRawValue(valueBuff);
            value.readFields((DataInput)valueBuff);
            values.add(value.toString());
        }
    }

    @Test
    public void testCompressed() throws IOException {
        this.testMergeShouldReturnProperProgress(this.getCompressedSegments());
    }

    @Test
    public void testUncompressed() throws IOException {
        this.testMergeShouldReturnProperProgress(this.getUncompressedSegments());
    }

    public void testMergeShouldReturnProperProgress(List<Merger.Segment<Text, Text>> segments) throws IOException {
        Path tmpDir = new Path(this.jobConf.get("mapreduce.cluster.temp.dir"), "localpath");
        Class keyClass = this.jobConf.getMapOutputKeyClass();
        Class valueClass = this.jobConf.getMapOutputValueClass();
        RawComparator comparator = this.jobConf.getOutputKeyComparator();
        Counters.Counter readsCounter = new Counters.Counter();
        Counters.Counter writesCounter = new Counters.Counter();
        Progress mergePhase = new Progress();
        RawKeyValueIterator mergeQueue = Merger.merge((Configuration)this.jobConf, (FileSystem)this.fs, (Class)keyClass, (Class)valueClass, segments, (int)2, (Path)tmpDir, (RawComparator)comparator, (Progressable)this.getReporter(), (Counters.Counter)readsCounter, (Counters.Counter)writesCounter, (Progress)mergePhase);
        float epsilon = 1.0E-5f;
        Assert.assertEquals((float)0.33333334f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)0.33333334f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)0.5f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)0.6666667f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)0.6666667f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)0.8333333f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((boolean)mergeQueue.next());
        Assert.assertEquals((float)1.0f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertFalse((boolean)mergeQueue.next());
        Assert.assertEquals((float)1.0f, (float)mergeQueue.getProgress().get(), (float)1.0E-5f);
        Assert.assertTrue((mergeQueue.getKey() == null ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)mergeQueue.getValue().getData().length);
    }

    private Progressable getReporter() {
        Progressable reporter = new Progressable(){

            public void progress() {
            }
        };
        return reporter;
    }

    private List<Merger.Segment<Text, Text>> getUncompressedSegments() throws IOException {
        ArrayList<Merger.Segment<Text, Text>> segments = new ArrayList<Merger.Segment<Text, Text>>();
        for (int i = 0; i < 2; ++i) {
            segments.add(this.getUncompressedSegment(i));
        }
        return segments;
    }

    private List<Merger.Segment<Text, Text>> getCompressedSegments() throws IOException {
        ArrayList<Merger.Segment<Text, Text>> segments = new ArrayList<Merger.Segment<Text, Text>>();
        for (int i = 0; i < 2; ++i) {
            segments.add(this.getCompressedSegment(i));
        }
        return segments;
    }

    private Merger.Segment<Text, Text> getUncompressedSegment(int i) throws IOException {
        return new Merger.Segment(this.getReader(i, false), false);
    }

    private Merger.Segment<Text, Text> getCompressedSegment(int i) throws IOException {
        return new Merger.Segment(this.getReader(i, true), false, 3000L);
    }

    private IFile.Reader<Text, Text> getReader(int i, boolean isCompressedInput) throws IOException {
        IFile.Reader readerMock = (IFile.Reader)Mockito.mock(IFile.Reader.class);
        Mockito.when((Object)readerMock.getLength()).thenReturn((Object)30L);
        Mockito.when((Object)readerMock.getPosition()).thenReturn((Object)0L).thenReturn((Object)10L).thenReturn((Object)20L);
        Mockito.when((Object)readerMock.nextRawKey((DataInputBuffer)ArgumentMatchers.any(DataInputBuffer.class))).thenAnswer(this.getKeyAnswer("Segment" + i, isCompressedInput));
        ((IFile.Reader)Mockito.doAnswer(this.getValueAnswer("Segment" + i)).when((Object)readerMock)).nextRawValue((DataInputBuffer)ArgumentMatchers.any(DataInputBuffer.class));
        return readerMock;
    }

    private Answer<?> getKeyAnswer(final String segmentName, final boolean isCompressedInput) {
        return new Answer<Object>(){
            int i = 0;

            public Boolean answer(InvocationOnMock invocation) {
                if (this.i++ == 3) {
                    return false;
                }
                IFile.Reader mock = (IFile.Reader)invocation.getMock();
                int multiplier = isCompressedInput ? 100 : 1;
                mock.bytesRead += (long)(10 * multiplier);
                Object[] args = invocation.getArguments();
                DataInputBuffer key = (DataInputBuffer)args[0];
                key.reset(("Segment Key " + segmentName + this.i).getBytes(), 20);
                return true;
            }
        };
    }

    private Answer<?> getValueAnswer(final String segmentName) {
        return new Answer<Void>(){
            int i = 0;

            public Void answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                DataInputBuffer key = (DataInputBuffer)args[0];
                key.reset(("Segment Value " + segmentName + this.i).getBytes(), 20);
                return null;
            }
        };
    }
}

