/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce.task.reduce;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.util.ArrayList;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.IFileOutputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ExceptionReporter;
import org.apache.hadoop.mapreduce.task.reduce.Fetcher;
import org.apache.hadoop.mapreduce.task.reduce.IFileWrappedMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.MapHost;
import org.apache.hadoop.mapreduce.task.reduce.MergeManagerImpl;
import org.apache.hadoop.mapreduce.task.reduce.OnDiskMapOutput;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleClientMetrics;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Time;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestFetcher {
    private static final Logger LOG = LoggerFactory.getLogger(TestFetcher.class);
    JobConf job = null;
    JobConf jobWithRetry = null;
    TaskAttemptID id = null;
    ShuffleSchedulerImpl<Text, Text> ss = null;
    MergeManagerImpl<Text, Text> mm = null;
    Reporter r = null;
    ShuffleClientMetrics metrics = null;
    ExceptionReporter except = null;
    SecretKey key = null;
    HttpURLConnection connection = null;
    Counters.Counter allErrs = null;
    final String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
    final MapHost host = new MapHost("localhost", "http://localhost:8080/");
    final TaskAttemptID map1ID = TaskAttemptID.forName((String)"attempt_0_1_m_1_1");
    final TaskAttemptID map2ID = TaskAttemptID.forName((String)"attempt_0_1_m_2_1");
    FileSystem fs = null;
    @Rule
    public TestName name = new TestName();

    @Before
    public void setup() {
        LOG.info(">>>> " + this.name.getMethodName());
        ReadaheadPool.resetInstance();
        this.job = new JobConf();
        this.job.setBoolean("mapreduce.reduce.shuffle.fetch.retry.enabled", false);
        this.jobWithRetry = new JobConf();
        this.jobWithRetry.setBoolean("mapreduce.reduce.shuffle.fetch.retry.enabled", true);
        this.id = TaskAttemptID.forName((String)"attempt_0_1_r_1_1");
        this.ss = (ShuffleSchedulerImpl)Mockito.mock(ShuffleSchedulerImpl.class);
        this.mm = (MergeManagerImpl)Mockito.mock(MergeManagerImpl.class);
        this.r = (Reporter)Mockito.mock(Reporter.class);
        this.metrics = (ShuffleClientMetrics)Mockito.mock(ShuffleClientMetrics.class);
        this.except = (ExceptionReporter)Mockito.mock(ExceptionReporter.class);
        this.key = JobTokenSecretManager.createSecretKey((byte[])new byte[]{0, 0, 0, 0});
        this.connection = (HttpURLConnection)Mockito.mock(HttpURLConnection.class);
        this.allErrs = (Counters.Counter)Mockito.mock(Counters.Counter.class);
        Mockito.when((Object)this.r.getCounter(Mockito.anyString(), Mockito.anyString())).thenReturn((Object)this.allErrs);
        ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
        maps.add(this.map1ID);
        maps.add(this.map2ID);
        Mockito.when((Object)this.ss.getMapsForHost(this.host)).thenReturn(maps);
    }

    @After
    public void teardown() throws IllegalArgumentException, IOException {
        LOG.info("<<<< " + this.name.getMethodName());
        if (this.fs != null) {
            this.fs.delete(new Path(this.name.getMethodName()), true);
        }
    }

    @Test
    public void testReduceOutOfDiskSpace() throws Throwable {
        LOG.info("testReduceOutOfDiskSpace");
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenThrow(new Throwable[]{new DiskChecker.DiskErrorException("No disk space available")});
        underTest.copyFromHost(this.host);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).reportLocalError((IOException)Mockito.any(IOException.class));
    }

    @Test(timeout=30000L)
    public void testCopyFromHostConnectionTimeout() throws Exception {
        Mockito.when((Object)this.connection.getInputStream()).thenThrow(new Throwable[]{new SocketTimeoutException("This is a fake timeout :)")});
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        underTest.copyFromHost(this.host);
        ((HttpURLConnection)Mockito.verify((Object)this.connection)).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((Counters.Counter)Mockito.verify((Object)this.allErrs)).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map1ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map2ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostConnectionRejected() throws Exception {
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)429);
        FakeFetcher<Text, Text> fetcher = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        fetcher.copyFromHost(this.host);
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.ss.hostFailureCount(this.host.getHostName())).withFailMessage("No host failure is expected.", new Object[0])).isEqualTo(0);
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.ss.fetchFailureCount(this.map1ID)).withFailMessage("No fetch failure is expected.", new Object[0])).isEqualTo(0);
        ((AbstractIntegerAssert)Assertions.assertThat((int)this.ss.fetchFailureCount(this.map2ID)).withFailMessage("No fetch failure is expected.", new Object[0])).isEqualTo(0);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).penalize((MapHost)Mockito.eq((Object)this.host), Mockito.anyLong());
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostBogusHeader() throws Exception {
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ByteArrayInputStream in = new ByteArrayInputStream("\u00010 BOGUS DATA\nBOGUS DATA\nBOGUS DATA\n".getBytes());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        underTest.copyFromHost(this.host);
        ((HttpURLConnection)Mockito.verify((Object)this.connection)).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((Counters.Counter)Mockito.verify((Object)this.allErrs)).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map1ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map2ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostIncompatibleShuffleVersion() throws Exception {
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce").thenReturn((Object)"other").thenReturn((Object)"other");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.1").thenReturn((Object)"1.0.0").thenReturn((Object)"1.0.1");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        for (int i = 0; i < 3; ++i) {
            FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
            underTest.copyFromHost(this.host);
        }
        ((HttpURLConnection)Mockito.verify((Object)this.connection, (VerificationMode)Mockito.times((int)3))).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((Counters.Counter)Mockito.verify((Object)this.allErrs, (VerificationMode)Mockito.times((int)3))).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).copyFailed(this.map1ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).copyFailed(this.map2ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostIncompatibleShuffleVersionWithRetry() throws Exception {
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce").thenReturn((Object)"other").thenReturn((Object)"other");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.1").thenReturn((Object)"1.0.0").thenReturn((Object)"1.0.1");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ByteArrayInputStream in = new ByteArrayInputStream(new byte[0]);
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        for (int i = 0; i < 3; ++i) {
            FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.jobWithRetry, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
            underTest.copyFromHost(this.host);
        }
        ((HttpURLConnection)Mockito.verify((Object)this.connection, (VerificationMode)Mockito.times((int)3))).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((Counters.Counter)Mockito.verify((Object)this.allErrs, (VerificationMode)Mockito.times((int)3))).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).copyFailed(this.map1ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).copyFailed(this.map2ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)3))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostWait() throws Exception {
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn(null);
        underTest.copyFromHost(this.host);
        ((HttpURLConnection)Mockito.verify((Object)this.connection)).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((Counters.Counter)Mockito.verify((Object)this.allErrs, (VerificationMode)Mockito.never())).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.never())).copyFailed(this.map1ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.never())).copyFailed(this.map2ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test(timeout=10000L)
    public void testCopyFromHostCompressFailure() throws Exception {
        InMemoryMapOutput immo = (InMemoryMapOutput)Mockito.mock(InMemoryMapOutput.class);
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((InMemoryMapOutput)Mockito.doThrow((Throwable[])new Throwable[]{new InternalError()}).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        underTest.copyFromHost(this.host);
        ((HttpURLConnection)Mockito.verify((Object)this.connection)).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map1ID, this.host, true, false);
    }

    @Test(timeout=10000L)
    public void testCopyFromHostOnAnyException() throws Exception {
        InMemoryMapOutput immo = (InMemoryMapOutput)Mockito.mock(InMemoryMapOutput.class);
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((InMemoryMapOutput)Mockito.doThrow((Throwable[])new Throwable[]{new ArrayIndexOutOfBoundsException()}).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        underTest.copyFromHost(this.host);
        ((HttpURLConnection)Mockito.verify((Object)this.connection)).addRequestProperty("UrlHash", "vFE234EIFCiBgYs2tCXY/SjT8Kg=");
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map1ID, this.host, true, false);
    }

    @Test(timeout=10000L)
    public void testCopyFromHostWithRetry() throws Exception {
        InMemoryMapOutput immo = (InMemoryMapOutput)Mockito.mock(InMemoryMapOutput.class);
        this.ss = (ShuffleSchedulerImpl)Mockito.mock(ShuffleSchedulerImpl.class);
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.jobWithRetry, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection, true);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        final long retryTime = Time.monotonicNow();
        ((InMemoryMapOutput)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock ignore) throws IOException {
                if (Time.monotonicNow() - retryTime <= 3000L) {
                    throw new InternalError();
                }
                return null;
            }
        }).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        underTest.copyFromHost(this.host);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.never())).copyFailed((TaskAttemptID)Mockito.any(TaskAttemptID.class), (MapHost)Mockito.any(MapHost.class), Mockito.anyBoolean(), Mockito.anyBoolean());
    }

    @Test(timeout=10000L)
    public void testCopyFromHostWithRetryThenTimeout() throws Exception {
        InMemoryMapOutput immo = (InMemoryMapOutput)Mockito.mock(InMemoryMapOutput.class);
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.jobWithRetry, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200).thenThrow(new Throwable[]{new SocketTimeoutException("forced timeout")});
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((InMemoryMapOutput)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("forced error")}).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        underTest.copyFromHost(this.host);
        ((Counters.Counter)Mockito.verify((Object)this.allErrs)).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map1ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).copyFailed(this.map2ID, this.host, false, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.times((int)1))).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    @Test
    public void testCopyFromHostExtraBytes() throws Exception {
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 14L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bout);
        IFileOutputStream ios = new IFileOutputStream((OutputStream)dos);
        header.write((DataOutput)dos);
        ios.write("MAPDATA123".getBytes());
        ios.finish();
        ShuffleHeader header2 = new ShuffleHeader(this.map2ID.toString(), 14L, 10L, 1);
        IFileOutputStream ios2 = new IFileOutputStream((OutputStream)dos);
        header2.write((DataOutput)dos);
        ios2.write("MAPDATA456".getBytes());
        ios2.finish();
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        InMemoryMapOutput mapOut = new InMemoryMapOutput((Configuration)this.job, this.map1ID, this.mm, 8, null, true);
        InMemoryMapOutput mapOut2 = new InMemoryMapOutput((Configuration)this.job, this.map2ID, this.mm, 10, null, true);
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.eq((Object)this.map1ID), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)mapOut);
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.eq((Object)this.map2ID), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)mapOut2);
        underTest.copyFromHost(this.host);
        ((Counters.Counter)Mockito.verify((Object)this.allErrs)).increment(1L);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).copyFailed(this.map1ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss, (VerificationMode)Mockito.never())).copyFailed(this.map2ID, this.host, true, false);
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map1ID));
        ((ShuffleSchedulerImpl)Mockito.verify(this.ss)).putBackKnownMapOutput((MapHost)Mockito.any(MapHost.class), (TaskAttemptID)Mockito.eq((Object)this.map2ID));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCorruptedIFile() throws Exception {
        int fetcher = 7;
        Path onDiskMapOutputPath = new Path(this.name.getMethodName() + "/foo");
        Path shuffledToDisk = OnDiskMapOutput.getTempPath((Path)onDiskMapOutputPath, (int)7);
        this.fs = FileSystem.getLocal((Configuration)this.job).getRaw();
        OnDiskMapOutput odmo = new OnDiskMapOutput(this.map1ID, this.mm, 100L, this.job, 7, true, this.fs, onDiskMapOutputPath);
        String mapData = "MAPDATA12345678901234567890";
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 14L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bout);
        IFileOutputStream ios = new IFileOutputStream((OutputStream)dos);
        header.write((DataOutput)dos);
        int headerSize = dos.size();
        try {
            ios.write(mapData.getBytes());
        }
        finally {
            ios.close();
        }
        int dataSize = bout.size() - headerSize;
        MapHost host = new MapHost("TestHost", "http://test/url");
        try (ByteArrayInputStream bin = new ByteArrayInputStream(bout.toByteArray());){
            bin.read(new byte[headerSize], 0, headerSize);
            odmo.shuffle(host, (InputStream)bin, (long)dataSize, (long)dataSize, this.metrics, Reporter.NULL);
        }
        byte[] corrupted = bout.toByteArray();
        corrupted[headerSize + dataSize / 2] = 0;
        try {
            bin = new ByteArrayInputStream(corrupted);
            bin.read(new byte[headerSize], 0, headerSize);
            odmo.shuffle(host, (InputStream)bin, (long)dataSize, (long)dataSize, this.metrics, Reporter.NULL);
            Assert.fail((String)"OnDiskMapOutput.shuffle didn't detect the corrupted map partition file");
        }
        catch (ChecksumException e) {
            LOG.info("The expected checksum exception was thrown.", (Throwable)e);
        }
        finally {
            bin.close();
        }
        try (IFileInputStream iFin = new IFileInputStream((InputStream)this.fs.open(shuffledToDisk), (long)dataSize, (Configuration)this.job);){
            iFin.read(new byte[dataSize], 0, dataSize);
        }
    }

    @Test(timeout=10000L)
    public void testInterruptInMemory() throws Exception {
        int FETCHER = 2;
        IFileWrappedMapOutput immo = (IFileWrappedMapOutput)Mockito.spy((Object)new InMemoryMapOutput((Configuration)this.job, this.id, this.mm, 100, null, true));
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((MergeManagerImpl)Mockito.doNothing().when(this.mm)).waitForResource();
        Mockito.when((Object)this.ss.getHost()).thenReturn((Object)this.host);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        final StuckInputStream in = new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        ((HttpURLConnection)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock ignore) throws IOException {
                in.close();
                return null;
            }
        }).when((Object)this.connection)).disconnect();
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection, 2);
        underTest.start();
        in.waitForFetcher();
        underTest.shutDown();
        underTest.join();
        Assert.assertTrue((boolean)in.wasClosedProperly());
        ((IFileWrappedMapOutput)Mockito.verify((Object)immo)).abort();
    }

    @Test(timeout=10000L)
    public void testInterruptOnDisk() throws Exception {
        int FETCHER = 7;
        Path p = new Path("file:///tmp/foo");
        Path pTmp = OnDiskMapOutput.getTempPath((Path)p, (int)7);
        FileSystem mFs = (FileSystem)Mockito.mock(FileSystem.class, (Answer)Mockito.RETURNS_DEEP_STUBS);
        IFileWrappedMapOutput odmo = (IFileWrappedMapOutput)Mockito.spy((Object)new OnDiskMapOutput(this.map1ID, this.mm, 100L, this.job, 7, true, mFs, p));
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)odmo);
        ((MergeManagerImpl)Mockito.doNothing().when(this.mm)).waitForResource();
        Mockito.when((Object)this.ss.getHost()).thenReturn((Object)this.host);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        final StuckInputStream in = new StuckInputStream(new ByteArrayInputStream(bout.toByteArray()));
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        ((HttpURLConnection)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock ignore) throws IOException {
                in.close();
                return null;
            }
        }).when((Object)this.connection)).disconnect();
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.job, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection, 7);
        underTest.start();
        in.waitForFetcher();
        underTest.shutDown();
        underTest.join();
        Assert.assertTrue((boolean)in.wasClosedProperly());
        ((FileSystem)Mockito.verify((Object)mFs)).create((Path)Mockito.eq((Object)pTmp));
        ((FileSystem)Mockito.verify((Object)mFs)).delete((Path)Mockito.eq((Object)pTmp), Mockito.eq((boolean)false));
        ((IFileWrappedMapOutput)Mockito.verify((Object)odmo)).abort();
    }

    @Test(timeout=10000L)
    public void testCopyFromHostWithRetryUnreserve() throws Exception {
        InMemoryMapOutput immo = (InMemoryMapOutput)Mockito.mock(InMemoryMapOutput.class);
        FakeFetcher<Text, Text> underTest = new FakeFetcher<Text, Text>(this.jobWithRetry, this.id, this.ss, this.mm, this.r, this.metrics, this.except, this.key, this.connection);
        String replyHash = SecureShuffleUtils.generateHash((byte[])"vFE234EIFCiBgYs2tCXY/SjT8Kg=".getBytes(), (SecretKey)this.key);
        Mockito.when((Object)this.connection.getResponseCode()).thenReturn((Object)200);
        Mockito.when((Object)this.connection.getHeaderField("ReplyHash")).thenReturn((Object)replyHash);
        ShuffleHeader header = new ShuffleHeader(this.map1ID.toString(), 10L, 10L, 1);
        ByteArrayOutputStream bout = new ByteArrayOutputStream();
        header.write((DataOutput)new DataOutputStream(bout));
        ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
        Mockito.when((Object)this.connection.getInputStream()).thenReturn((Object)in);
        Mockito.when((Object)this.connection.getHeaderField("name")).thenReturn((Object)"mapreduce");
        Mockito.when((Object)this.connection.getHeaderField("version")).thenReturn((Object)"1.0.0");
        Mockito.when((Object)this.mm.reserve((TaskAttemptID)Mockito.any(TaskAttemptID.class), Mockito.anyLong(), Mockito.anyInt())).thenReturn((Object)immo);
        ((InMemoryMapOutput)Mockito.doThrow((Throwable[])new Throwable[]{new IOException("forced error")}).when((Object)immo)).shuffle((MapHost)Mockito.any(MapHost.class), (InputStream)Mockito.any(InputStream.class), Mockito.anyLong(), Mockito.anyLong(), (ShuffleClientMetrics)Mockito.any(ShuffleClientMetrics.class), (Reporter)Mockito.any(Reporter.class));
        underTest.copyFromHost(this.host);
        ((InMemoryMapOutput)Mockito.verify((Object)immo)).abort();
    }

    public static class FakeFetcher<K, V>
    extends Fetcher<K, V> {
        private boolean renewConnection = false;

        public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManagerImpl<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection) {
            super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, jobTokenSecret);
            this.connection = connection;
        }

        public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManagerImpl<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection, boolean renewConnection) {
            super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, jobTokenSecret);
            this.connection = connection;
            this.renewConnection = renewConnection;
        }

        public FakeFetcher(JobConf job, TaskAttemptID reduceId, ShuffleSchedulerImpl<K, V> scheduler, MergeManagerImpl<K, V> merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret, HttpURLConnection connection, int id) {
            super(job, reduceId, scheduler, merger, reporter, metrics, exceptionReporter, jobTokenSecret, id);
            this.connection = connection;
        }

        protected void openConnection(URL url) throws IOException {
            if (null == this.connection || this.renewConnection) {
                super.openConnection(url);
            }
        }
    }

    static class StuckInputStream
    extends FilterInputStream {
        boolean stuck = false;
        volatile boolean closed = false;

        StuckInputStream(InputStream inner) {
            super(inner);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        int freeze() throws IOException {
            StuckInputStream stuckInputStream = this;
            synchronized (stuckInputStream) {
                this.stuck = true;
                this.notify();
            }
            while (!Thread.currentThread().isInterrupted() || this.closed) {
                if (!this.closed) continue;
                throw new IOException("underlying stream closed, triggered an error");
            }
            return 0;
        }

        @Override
        public int read() throws IOException {
            int ret = super.read();
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        @Override
        public int read(byte[] b) throws IOException {
            int ret = super.read(b);
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int ret = super.read(b, off, len);
            if (ret != -1) {
                return ret;
            }
            return this.freeze();
        }

        @Override
        public void close() throws IOException {
            this.closed = true;
        }

        public synchronized void waitForFetcher() throws InterruptedException {
            while (!this.stuck) {
                this.wait();
            }
        }

        public boolean wasClosedProperly() {
            return this.closed;
        }
    }
}

