package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.BitSet;
import java.util.LinkedList;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils.class */
public class TestShuffleUtils {
    private OutputContext outputContext;
    private Configuration conf;
    private FileSystem localFs;
    private Path workingDir;

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/TestShuffleUtils$ConfigurableCodecForTest.class */
    public static class ConfigurableCodecForTest implements CompressionCodec, Configurable {
        public Compressor createCompressor() {
            return null;
        }

        public Decompressor createDecompressor() {
            return null;
        }

        public CompressionInputStream createInputStream(InputStream inputStream) throws IOException {
            return null;
        }

        public CompressionInputStream createInputStream(InputStream inputStream, Decompressor decompressor) throws IOException {
            return null;
        }

        public CompressionOutputStream createOutputStream(OutputStream outputStream) throws IOException {
            return null;
        }

        public CompressionOutputStream createOutputStream(OutputStream outputStream, Compressor compressor) throws IOException {
            return null;
        }

        public Class<? extends Compressor> getCompressorType() {
            return null;
        }

        public Class<? extends Decompressor> getDecompressorType() {
            return null;
        }

        public String getDefaultExtension() {
            return null;
        }

        public Configuration getConf() {
            return null;
        }

        public void setConf(Configuration configuration) {
        }
    }

    private InputContext createTezInputContext() {
        ApplicationId newInstance = ApplicationId.newInstance(1L, 1);
        InputContext inputContext = (InputContext) Mockito.mock(InputContext.class);
        ((InputContext) Mockito.doReturn(newInstance).when(inputContext)).getApplicationId();
        ((InputContext) Mockito.doReturn("sourceVertex").when(inputContext)).getSourceVertexName();
        Mockito.when(inputContext.getCounters()).thenReturn(new TezCounters());
        return inputContext;
    }

    private OutputContext createTezOutputContext() throws IOException {
        ApplicationId.newInstance(1L, 1);
        OutputContext outputContext = (OutputContext) Mockito.mock(OutputContext.class);
        ExecutionContextImpl executionContextImpl = (ExecutionContextImpl) Mockito.mock(ExecutionContextImpl.class);
        ((ExecutionContextImpl) Mockito.doReturn(org.apache.tez.runtime.library.common.shuffle.orderedgrouped.TestFetcher.HOST).when(executionContextImpl)).getHostName();
        ((OutputContext) Mockito.doReturn(executionContextImpl).when(outputContext)).getExecutionContext();
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(80);
        ((OutputContext) Mockito.doReturn(ByteBuffer.wrap(dataOutputBuffer.getData())).when(outputContext)).getServiceProviderMetaData(this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"));
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getTaskVertexIndex();
        ((OutputContext) Mockito.doReturn(1).when(outputContext)).getOutputIndex();
        ((OutputContext) Mockito.doReturn(0).when(outputContext)).getDAGAttemptNumber();
        ((OutputContext) Mockito.doReturn("destVertex").when(outputContext)).getDestinationVertexName();
        Mockito.when(outputContext.getCounters()).thenReturn(new TezCounters());
        return outputContext;
    }

    @Before
    public void setup() throws Exception {
        this.conf = new Configuration();
        this.outputContext = createTezOutputContext();
        this.conf.set("fs.defaultFS", "file:///");
        this.localFs = FileSystem.getLocal(this.conf);
        this.workingDir = new Path(new Path(System.getProperty("test.build.data", "/tmp")), TestShuffleUtils.class.getName()).makeQualified(this.localFs.getUri(), this.localFs.getWorkingDirectory());
        String path = this.workingDir.toString();
        this.conf.set("tez.runtime.key.class", Text.class.getName());
        this.conf.set("tez.runtime.value.class", Text.class.getName());
        this.conf.set("tez.runtime.partitioner.class", HashPartitioner.class.getName());
        this.conf.setStrings("tez.runtime.framework.local.dirs", new String[]{path});
    }

    private Path createIndexFile(int i, boolean z) throws IOException {
        Path path = new Path(this.workingDir, "file.index.out");
        TezSpillRecord tezSpillRecord = new TezSpillRecord(i);
        long j = 0;
        for (int i2 = 0; i2 < i; i2++) {
            long nextLong = ThreadLocalRandom.current().nextLong(100L, 200L);
            if (i2 % 2 == 0 || z) {
                nextLong = 0;
            }
            TezIndexRecord tezIndexRecord = new TezIndexRecord(j, nextLong, 200L);
            j += 200;
            tezSpillRecord.putIndex(tezIndexRecord, i2);
        }
        tezSpillRecord.writeToFile(path, this.conf, FileSystem.getLocal(this.conf).getRaw());
        return path;
    }

    @Test
    public void testGenerateOnSpillEvent() throws Exception {
        LinkedList newLinkedList = Lists.newLinkedList();
        ShuffleUtils.generateEventOnSpill(newLinkedList, false, false, this.outputContext, 0, new TezSpillRecord(createIndexFile(10, false), this.conf), 10, true, "/attempt_x_y_0/file.out", (long[]) null, false, this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"), TezCommonUtils.newBestCompressionDeflater());
        Assert.assertTrue(newLinkedList.size() == 1);
        Assert.assertTrue(newLinkedList.get(0) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) newLinkedList.get(0);
        Assert.assertTrue(compositeDataMovementEvent.getCount() == 10);
        Assert.assertTrue(compositeDataMovementEvent.getSourceIndexStart() == 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertTrue(parseFrom.getSpillId() == 0);
        Assert.assertTrue(parseFrom.hasLastEvent() && !parseFrom.getLastEvent());
        BitSet fromByteArray = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions()));
        Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 5) = " + fromByteArray.cardinality(), fromByteArray.cardinality() == 5);
        newLinkedList.clear();
    }

    @Test
    public void testGenerateOnSpillEvent_With_FinalMerge() throws Exception {
        LinkedList newLinkedList = Lists.newLinkedList();
        ShuffleUtils.generateEventOnSpill(newLinkedList, true, true, this.outputContext, 0, new TezSpillRecord(createIndexFile(10, false), this.conf), 10, true, "/attempt_x_y_0/file.out", (long[]) null, false, this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"), TezCommonUtils.newBestCompressionDeflater());
        Assert.assertTrue(newLinkedList.size() == 2);
        Assert.assertTrue(newLinkedList.get(0) instanceof VertexManagerEvent);
        Assert.assertTrue(newLinkedList.get(1) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) newLinkedList.get(1);
        Assert.assertTrue(compositeDataMovementEvent.getCount() == 10);
        Assert.assertTrue(compositeDataMovementEvent.getSourceIndexStart() == 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertFalse(parseFrom.hasSpillId());
        Assert.assertFalse(parseFrom.hasLastEvent() || parseFrom.getLastEvent());
        BitSet fromByteArray = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions()));
        Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 5) = " + fromByteArray.cardinality(), fromByteArray.cardinality() == 5);
    }

    @Test
    public void testGenerateOnSpillEvent_With_All_EmptyPartitions() throws Exception {
        LinkedList newLinkedList = Lists.newLinkedList();
        ShuffleUtils.generateEventOnSpill(newLinkedList, false, true, this.outputContext, 0, new TezSpillRecord(createIndexFile(10, true), this.conf), 10, true, "/attempt_x_y_0/file.out", (long[]) null, false, this.conf.get("tez.am.shuffle.auxiliary-service.id", "mapreduce_shuffle"), TezCommonUtils.newBestCompressionDeflater());
        Assert.assertTrue(newLinkedList.size() == 2);
        Assert.assertTrue(newLinkedList.get(0) instanceof VertexManagerEvent);
        Assert.assertTrue(newLinkedList.get(1) instanceof CompositeDataMovementEvent);
        CompositeDataMovementEvent compositeDataMovementEvent = (CompositeDataMovementEvent) newLinkedList.get(1);
        Assert.assertTrue(compositeDataMovementEvent.getCount() == 10);
        Assert.assertTrue(compositeDataMovementEvent.getSourceIndexStart() == 0);
        ShuffleUserPayloads.DataMovementEventPayloadProto parseFrom = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom(compositeDataMovementEvent.getUserPayload()));
        Assert.assertTrue(parseFrom.getSpillId() == 0);
        Assert.assertTrue(parseFrom.hasLastEvent() && parseFrom.getLastEvent());
        Assert.assertTrue(parseFrom.getPathComponent().equals(""));
        BitSet fromByteArray = TezUtilsInternal.fromByteArray(TezCommonUtils.decompressByteStringToByteArray(parseFrom.getEmptyPartitions()));
        Assert.assertTrue("emptyPartitionBitSet cardinality (expecting 10) = " + fromByteArray.cardinality(), fromByteArray.cardinality() == 10);
    }

    @Test
    public void testInternalErrorTranslation() throws Exception {
        CompressionInputStream compressionInputStream = (CompressionInputStream) Mockito.mock(CompressionInputStream.class);
        Mockito.when(Integer.valueOf(compressionInputStream.read((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()))).thenThrow(new Throwable[]{new InternalError("codec failure")});
        Decompressor decompressor = (Decompressor) Mockito.mock(Decompressor.class);
        CompressionCodec compressionCodec = (CompressionCodec) Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when(((ConfigurableCodecForTest) compressionCodec).getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when(compressionCodec.createDecompressor()).thenReturn(decompressor);
        Mockito.when(compressionCodec.createInputStream((InputStream) Mockito.any(InputStream.class), (Decompressor) Mockito.any(Decompressor.class))).thenReturn(compressionInputStream);
        try {
            ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(new byte[]{84, 73, 70, 1}), 1024, 128, compressionCodec, false, 0, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null);
            Assert.fail("shuffle was supposed to throw!");
        } catch (IOException e) {
            Assert.assertTrue(e.getCause() instanceof InternalError);
            Assert.assertTrue(e.getMessage().contains("codec failure"));
        }
    }

    @Test
    public void testExceptionTranslation() throws Exception {
        CompressionInputStream compressionInputStream = (CompressionInputStream) Mockito.mock(CompressionInputStream.class);
        Mockito.when(Integer.valueOf(compressionInputStream.read((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()))).thenThrow(new Throwable[]{new IllegalArgumentException("codec failure")});
        Decompressor decompressor = (Decompressor) Mockito.mock(Decompressor.class);
        CompressionCodec compressionCodec = (CompressionCodec) Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when(((ConfigurableCodecForTest) compressionCodec).getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when(compressionCodec.createDecompressor()).thenReturn(decompressor);
        Mockito.when(compressionCodec.createInputStream((InputStream) Mockito.any(InputStream.class), (Decompressor) Mockito.any(Decompressor.class))).thenReturn(compressionInputStream);
        byte[] bArr = {84, 73, 70, 1};
        try {
            ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(bArr), 1024, 128, compressionCodec, false, 0, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null);
            Assert.fail("shuffle was supposed to throw!");
        } catch (IOException e) {
            Assert.assertTrue(e.getCause() instanceof IllegalArgumentException);
            Assert.assertTrue(e.getMessage().contains("codec failure"));
        }
        CompressionInputStream compressionInputStream2 = (CompressionInputStream) Mockito.mock(CompressionInputStream.class);
        Mockito.when(Integer.valueOf(compressionInputStream2.read((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()))).thenThrow(new Throwable[]{new SocketTimeoutException("codec failure")});
        CompressionCodec compressionCodec2 = (CompressionCodec) Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when(((ConfigurableCodecForTest) compressionCodec2).getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when(compressionCodec2.createDecompressor()).thenReturn(decompressor);
        Mockito.when(compressionCodec2.createInputStream((InputStream) Mockito.any(InputStream.class), (Decompressor) Mockito.any(Decompressor.class))).thenReturn(compressionInputStream2);
        try {
            ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(bArr), 1024, 128, compressionCodec2, false, 0, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null);
            Assert.fail("shuffle was supposed to throw!");
        } catch (IOException e2) {
            Assert.assertTrue(e2 instanceof SocketTimeoutException);
            Assert.assertTrue(e2.getMessage().contains("codec failure"));
        }
        CompressionInputStream compressionInputStream3 = (CompressionInputStream) Mockito.mock(CompressionInputStream.class);
        Mockito.when(Integer.valueOf(compressionInputStream3.read((byte[]) Mockito.any(byte[].class), Mockito.anyInt(), Mockito.anyInt()))).thenThrow(new Throwable[]{new InternalError("codec failure")});
        CompressionCodec compressionCodec3 = (CompressionCodec) Mockito.mock(ConfigurableCodecForTest.class);
        Mockito.when(((ConfigurableCodecForTest) compressionCodec3).getConf()).thenReturn(Mockito.mock(Configuration.class));
        Mockito.when(compressionCodec3.createDecompressor()).thenReturn(decompressor);
        Mockito.when(compressionCodec3.createInputStream((InputStream) Mockito.any(InputStream.class), (Decompressor) Mockito.any(Decompressor.class))).thenReturn(compressionInputStream3);
        try {
            ShuffleUtils.shuffleToMemory(new byte[1024], new ByteArrayInputStream(bArr), 1024, 128, compressionCodec3, false, 0, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null);
            Assert.fail("shuffle was supposed to throw!");
        } catch (IOException e3) {
            Assert.assertTrue(e3.getCause() instanceof InternalError);
            Assert.assertTrue(e3.getMessage().contains("codec failure"));
        }
    }

    @Test
    public void testShuffleToDiskChecksum() throws Exception {
        byte[] bArr = new byte[1000];
        Arrays.fill(bArr, (byte) 0);
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ShuffleUtils.shuffleToDisk(byteArrayOutputStream, "somehost", byteArrayInputStream, bArr.length, 2000L, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null, false, 0, false);
        Assert.assertArrayEquals(bArr, byteArrayOutputStream.toByteArray());
        byteArrayInputStream.reset();
        try {
            ShuffleUtils.shuffleToDisk((OutputStream) Mockito.mock(OutputStream.class), "somehost", byteArrayInputStream, bArr.length, 2000L, (Logger) Mockito.mock(Logger.class), (InputAttemptIdentifier) null, false, 0, true);
            Assert.fail("shuffle was supposed to throw!");
        } catch (IOException e) {
        }
    }

    @Test
    public void testFetchStatsLogger() throws Exception {
        Logger logger = (Logger) Mockito.mock(Logger.class);
        Logger logger2 = (Logger) Mockito.mock(Logger.class);
        ShuffleUtils.FetchStatsLogger fetchStatsLogger = new ShuffleUtils.FetchStatsLogger(logger, logger2);
        InputAttemptIdentifier inputAttemptIdentifier = new InputAttemptIdentifier(1, 1);
        Mockito.when(Boolean.valueOf(logger.isInfoEnabled())).thenReturn(false);
        for (int i = 0; i < 1000; i++) {
            fetchStatsLogger.logIndividualFetchComplete(10L, 100L, 1000L, "testType", inputAttemptIdentifier);
        }
        ((Logger) Mockito.verify(logger, Mockito.times(0))).info(Mockito.anyString());
        ((Logger) Mockito.verify(logger2, Mockito.times(1))).info(Mockito.anyString(), (Object[]) Matchers.anyVararg());
        Mockito.when(Boolean.valueOf(logger.isInfoEnabled())).thenReturn(true);
        for (int i2 = 0; i2 < 1000; i2++) {
            fetchStatsLogger.logIndividualFetchComplete(10L, 100L, 1000L, "testType", inputAttemptIdentifier);
        }
        ((Logger) Mockito.verify(logger, Mockito.times(1000))).info(Mockito.anyString());
        ((Logger) Mockito.verify(logger2, Mockito.times(1))).info(Mockito.anyString(), (Object[]) Matchers.anyVararg());
    }
}
