/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchResult;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
import org.apache.tez.runtime.library.common.shuffle.FetcherWithInjectableErrors;
import org.apache.tez.runtime.library.common.shuffle.InputAttemptFetchFailure;
import org.apache.tez.runtime.library.common.shuffle.LocalDiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Fetcher
extends CallableWithNdc<FetchResult> {
    private static final Logger LOG = LoggerFactory.getLogger(Fetcher.class);
    private static final AtomicInteger fetcherIdGen = new AtomicInteger(0);
    private final Configuration conf;
    private final int shufflePort;
    private CompressionCodec codec;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = 0x400000;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final int dagIdentifier;
    private final String logIdentifier;
    private final String localHostname;
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    protected final int fetcherIdentifier;
    private List<InputAttemptIdentifier> srcAttempts;
    @VisibleForTesting
    Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
    private String host;
    private int port;
    private int partition;
    private int partitionCount;
    private final Map<PathPartition, InputAttemptIdentifier> pathToAttemptMap;
    private URL url;
    private volatile DataInputStream input;
    BaseHttpConnection httpConnection;
    private HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final LocalDirAllocator localDirAllocator;
    private final Path lockPath;
    private final RawLocalFileSystem localFs;
    private long retryStartTime = 0L;
    private final boolean asyncHttp;
    private final boolean compositeFetch;
    private final boolean verifyDiskChecksum;
    private final boolean isDebugEnabled = LOG.isDebugEnabled();

    @VisibleForTesting
    public List<InputAttemptIdentifier> getSrcAttempts() {
        return this.srcAttempts;
    }

    @VisibleForTesting
    public String getHost() {
        return this.host;
    }

    protected Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext, JobTokenSecretManager jobTokenSecretManager, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
        this.asyncHttp = asyncHttp;
        this.verifyDiskChecksum = verifyDiskChecksum;
        this.fetcherCallback = fetcherCallback;
        this.inputManager = inputManager;
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.appId = inputContext.getApplicationId();
        this.dagIdentifier = inputContext.getDagIdentifier();
        this.pathToAttemptMap = new HashMap<PathPartition, InputAttemptIdentifier>();
        this.httpConnectionParams = params;
        this.conf = conf;
        this.localDiskFetchEnabled = localDiskFetchEnabled;
        this.sharedFetchEnabled = sharedFetchEnabled;
        this.fetcherIdentifier = fetcherIdGen.getAndIncrement();
        String sourceDestNameTrimmed = TezUtilsInternal.cleanVertexName((String)inputContext.getSourceVertexName()) + " -> " + TezUtilsInternal.cleanVertexName((String)inputContext.getTaskVertexName());
        this.logIdentifier = " fetcher [" + sourceDestNameTrimmed + "] " + this.fetcherIdentifier;
        this.localFs = localFs;
        this.localDirAllocator = localDirAllocator;
        this.lockPath = lockPath;
        this.localHostname = localHostname;
        this.shufflePort = shufflePort;
        this.compositeFetch = compositeFetch;
        try {
            if (this.sharedFetchEnabled) {
                this.localFs.mkdirs(this.lockPath);
            }
        }
        catch (Exception e) {
            LOG.warn("Error initializing local dirs for shared transfer " + e);
        }
    }

    void populateRemainingMap(List<InputAttemptIdentifier> origlist) {
        if (this.srcAttemptsRemaining == null) {
            this.srcAttemptsRemaining = new LinkedHashMap<String, InputAttemptIdentifier>(origlist.size());
        }
        for (InputAttemptIdentifier id : origlist) {
            this.srcAttemptsRemaining.put(id.toString(), id);
        }
    }

    public FetchResult callInternal() throws Exception {
        boolean isLocalFetch;
        boolean multiplex;
        boolean bl = multiplex = this.sharedFetchEnabled && this.localDiskFetchEnabled;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttempts);
        }
        this.populateRemainingMap(this.srcAttempts);
        for (InputAttemptIdentifier in : this.srcAttemptsRemaining.values()) {
            if (in instanceof CompositeInputAttemptIdentifier) {
                CompositeInputAttemptIdentifier cin = (CompositeInputAttemptIdentifier)in;
                for (int i = 0; i < cin.getInputIdentifierCount(); ++i) {
                    this.pathToAttemptMap.put(new PathPartition(cin.getPathComponent(), this.partition + i), cin.expand(i));
                }
            } else {
                this.pathToAttemptMap.put(new PathPartition(in.getPathComponent(), 0), in);
            }
            multiplex &= in.isShared();
        }
        if (multiplex) {
            Preconditions.checkArgument((this.partition == 0 ? 1 : 0) != 0, (String)"Shared fetches cannot be done for partitioned input- partition is non-zero (%d)", (Object[])new Object[]{this.partition});
        }
        boolean bl2 = isLocalFetch = this.localDiskFetchEnabled && this.host.equals(this.localHostname) && this.port == this.shufflePort;
        HostFetchResult hostFetchResult = isLocalFetch ? this.setupLocalDiskFetch() : (multiplex ? this.doSharedFetch() : this.doHttpFetch());
        if (hostFetchResult.failedInputs != null && hostFetchResult.failedInputs.length > 0) {
            if (!this.isShutDown.get()) {
                LOG.warn("copyInputs failed for tasks " + Arrays.toString(hostFetchResult.failedInputs));
                for (InputAttemptFetchFailure left : hostFetchResult.failedInputs) {
                    this.fetcherCallback.fetchFailed(this.host, left, hostFetchResult.connectFailed);
                }
            } else if (this.isDebugEnabled) {
                LOG.debug("Ignoring failed fetch reports for " + hostFetchResult.failedInputs.length + " inputs since the fetcher has already been stopped");
            }
        }
        this.shutdown();
        if (hostFetchResult.failedInputs == null && !this.srcAttemptsRemaining.isEmpty() && !multiplex) {
            throw new IOException("server didn't return all expected map outputs: " + this.srcAttemptsRemaining.size() + " left.");
        }
        return hostFetchResult.fetchResult;
    }

    private int findInputs() throws IOException {
        int k = 0;
        for (InputAttemptIdentifier src : this.srcAttemptsRemaining.values()) {
            try {
                if (this.getShuffleInputFileName(src.getPathComponent(), ".index") == null) continue;
                ++k;
            }
            catch (DiskChecker.DiskErrorException diskErrorException) {}
        }
        return k;
    }

    private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
        File lockFile = this.localFs.pathToFile(new Path(this.lockPath, this.host + ".lock"));
        boolean created = lockFile.createNewFile();
        if (!created && !lockFile.exists()) {
            return null;
        }
        FileChannel lockChannel = new RandomAccessFile(lockFile, "rws").getChannel();
        FileLock xlock = null;
        xlock = lockChannel.tryLock(0L, Long.MAX_VALUE, false);
        if (xlock != null) {
            return xlock;
        }
        lockChannel.close();
        return null;
    }

    private void releaseLock(FileLock lock) throws IOException {
        if (lock != null && lock.isValid()) {
            FileChannel lockChannel = lock.channel();
            lock.release();
            lockChannel.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected HostFetchResult doSharedFetch() throws IOException {
        int inputs = this.findInputs();
        if (inputs == this.srcAttemptsRemaining.size()) {
            if (this.isDebugEnabled) {
                LOG.debug("Using the copies found locally");
            }
            return this.doLocalDiskFetch(true);
        }
        if (inputs > 0) {
            if (this.isDebugEnabled) {
                LOG.debug("Found " + this.input + " local fetches right now, using them first");
            }
            return this.doLocalDiskFetch(false);
        }
        FileLock lock = null;
        try {
            lock = this.getLock();
            if (lock == null) {
                HostFetchResult hostFetchResult = new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values(), "Requeuing as we didn't get a lock"), null, false);
                return hostFetchResult;
            }
            if (this.findInputs() == this.srcAttemptsRemaining.size()) {
                this.releaseLock(lock);
                lock = null;
                HostFetchResult hostFetchResult = this.doLocalDiskFetch(true);
                return hostFetchResult;
            }
            HostFetchResult hostFetchResult = this.doHttpFetch(new CachingCallBack());
            return hostFetchResult;
        }
        catch (OverlappingFileLockException jvmCrossLock) {
            LOG.warn("Double locking detected for " + this.host);
        }
        catch (InterruptedException sleepInterrupted) {
            Thread.currentThread().interrupt();
            LOG.warn("Lock was interrupted for " + this.host);
        }
        finally {
            this.releaseLock(lock);
        }
        if (this.isShutDown.get()) {
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
        }
        return this.doHttpFetch();
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        return this.doHttpFetch(null);
    }

    private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> attempts) {
        StringBuilder baseURI = null;
        try {
            baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.partitionCount, this.appId.toString(), this.dagIdentifier, this.httpConnectionParams.isSslShuffle());
            this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts, this.httpConnectionParams.isKeepAlive());
            this.httpConnection = ShuffleUtils.getHttpConnection(this.asyncHttp, this.url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretMgr);
            this.httpConnection.connect();
        }
        catch (IOException | InterruptedException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            InputAttemptFetchFailure[] failedFetches = null;
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                }
            } else {
                failedFetches = InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values());
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), failedFetches, true);
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            if (this.isDebugEnabled) {
                LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
        }
        try {
            this.setupConnectionInternal(this.host, attempts);
        }
        catch (IOException e) {
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                }
            }
            InputAttemptIdentifier firstAttempt = attempts.iterator().next();
            LOG.warn("FETCH_FAILURE: Fetch Failure while connecting from {} to: {}:{}, attempt: {}, url: {} Informing ShuffleManager", new Object[]{this.localHostname, this.host, this.port, firstAttempt, baseURI, e});
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(firstAttempt)}, true);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
        return null;
    }

    protected void setupConnectionInternal(String host, Collection<InputAttemptIdentifier> attempts) throws IOException, InterruptedException {
        this.input = this.httpConnection.getInputStream();
        this.httpConnection.validate();
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch(CachingCallBack callback) {
        HostFetchResult connectionsWithRetryResult = this.setupConnection(this.srcAttemptsRemaining.values());
        if (connectionsWithRetryResult != null) {
            return connectionsWithRetryResult;
        }
        if (this.isShutDown.get()) {
            this.shutdownInternal();
            if (this.isDebugEnabled) {
                LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
        }
        InputAttemptFetchFailure[] failedInputs = null;
        while (!this.srcAttemptsRemaining.isEmpty() && failedInputs == null) {
            InputAttemptIdentifier inputAttemptIdentifier = this.srcAttemptsRemaining.entrySet().iterator().next().getValue();
            if (this.isShutDown.get()) {
                this.shutdownInternal(true);
                if (this.isDebugEnabled) {
                    LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                }
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
            }
            try {
                failedInputs = this.fetchInputs(this.input, callback, inputAttemptIdentifier);
            }
            catch (FetcherReadTimeoutException e) {
                this.shutdownInternal(true);
                if (this.isShutDown.get()) {
                    if (this.isDebugEnabled) {
                        LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                    }
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
                }
                connectionsWithRetryResult = this.setupConnection(this.srcAttemptsRemaining.values());
                if (connectionsWithRetryResult == null) continue;
                break;
            }
        }
        if (this.isShutDown.get() && failedInputs != null && failedInputs.length > 0) {
            if (this.isDebugEnabled) {
                LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " + failedInputs.length + " failed inputs");
            }
            failedInputs = null;
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), failedInputs, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        return this.doLocalDiskFetch(true);
    }

    @VisibleForTesting
    private HostFetchResult doLocalDiskFetch(boolean failMissing) {
        Iterator<Map.Entry<String, InputAttemptIdentifier>> iterator = this.srcAttemptsRemaining.entrySet().iterator();
        while (iterator.hasNext()) {
            boolean hasFailures = false;
            if (this.isShutDown.get()) {
                if (!this.isDebugEnabled) break;
                LOG.debug("Already shutdown. Skipping fetch for " + this.srcAttemptsRemaining.size() + " inputs");
                break;
            }
            InputAttemptIdentifier srcAttemptId = iterator.next().getValue();
            for (int curPartition = 0; curPartition < this.partitionCount; ++curPartition) {
                int reduceId = curPartition + this.partition;
                srcAttemptId = this.pathToAttemptMap.get(new PathPartition(srcAttemptId.getPathComponent(), reduceId));
                long startTime = System.currentTimeMillis();
                LocalDiskFetchedInput fetchedInput = null;
                try {
                    TezIndexRecord idxRecord = this.getTezIndexRecord(srcAttemptId, reduceId);
                    fetchedInput = new LocalDiskFetchedInput(idxRecord.getStartOffset(), idxRecord.getPartLength(), srcAttemptId, this.getShuffleInputFileName(srcAttemptId.getPathComponent(), null), this.conf, new FetchedInputCallback(){

                        @Override
                        public void fetchComplete(FetchedInput fetchedInput) {
                        }

                        @Override
                        public void fetchFailed(FetchedInput fetchedInput) {
                        }

                        @Override
                        public void freeResources(FetchedInput fetchedInput) {
                        }
                    });
                    if (this.isDebugEnabled) {
                        LOG.debug("fetcher about to shuffle output of srcAttempt (direct disk)" + srcAttemptId + " decomp: " + idxRecord.getRawLength() + " len: " + idxRecord.getPartLength() + " to " + ((FetchedInput)fetchedInput).getType());
                    }
                    long endTime = System.currentTimeMillis();
                    this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, idxRecord.getPartLength(), idxRecord.getRawLength(), endTime - startTime);
                    continue;
                }
                catch (IOException | InternalError e) {
                    hasFailures = true;
                    this.cleanupFetchedInput(fetchedInput);
                    if (this.isShutDown.get()) {
                        if (!this.isDebugEnabled) break;
                        LOG.debug("Already shutdown. Ignoring Local Fetch Failure for " + srcAttemptId + " from host " + this.host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
                        break;
                    }
                    if (!failMissing) continue;
                    LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + this.host + "(local fetch)", e);
                }
            }
            if (hasFailures) continue;
            iterator.remove();
        }
        InputAttemptFetchFailure[] failedFetches = null;
        if (failMissing && this.srcAttemptsRemaining.size() > 0) {
            if (this.isShutDown.get()) {
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown, not reporting fetch failures for: " + this.srcAttemptsRemaining.size() + " remaining inputs");
                }
            } else {
                failedFetches = InputAttemptFetchFailure.fromAttemptsLocalFetchFailure(this.srcAttemptsRemaining.values());
            }
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), failedFetches, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier srcAttemptId, int partition) throws IOException {
        Path indexFile = this.getShuffleInputFileName(srcAttemptId.getPathComponent(), ".index");
        TezSpillRecord spillRecord = new TezSpillRecord(indexFile, (FileSystem)this.localFs);
        TezIndexRecord idxRecord = spillRecord.getIndex(partition);
        return idxRecord;
    }

    private final String getMapOutputFile(String pathComponent) {
        String outputPath = "output/" + pathComponent + "/file.out";
        if (ShuffleUtils.isTezShuffleHandler(this.conf)) {
            return "dag_" + this.dagIdentifier + "/" + outputPath;
        }
        return outputPath;
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String pathComponent, String suffix) throws IOException {
        suffix = suffix != null ? suffix : "";
        String pathFromLocalDir = this.getMapOutputFile(pathComponent) + suffix;
        return this.localDirAllocator.getLocalPathToRead(pathFromLocalDir, this.conf);
    }

    @VisibleForTesting
    public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() {
        return this.pathToAttemptMap;
    }

    public void shutdown() {
        if (!this.isShutDown.getAndSet(true)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Shutting down fetcher for host: " + this.host);
            }
            this.shutdownInternal();
        }
    }

    private void shutdownInternal() {
        this.shutdownInternal(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void shutdownInternal(boolean disconnect) {
        AtomicBoolean atomicBoolean = this.isShutDown;
        synchronized (atomicBoolean) {
            block6: {
                try {
                    if (this.httpConnection != null) {
                        this.httpConnection.cleanup(disconnect);
                    }
                }
                catch (IOException e) {
                    LOG.info("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage());
                    if (!this.isDebugEnabled) break block6;
                    LOG.debug("", (Throwable)e);
                }
            }
        }
    }

    @VisibleForTesting
    InputAttemptFetchFailure[] fetchInputs(DataInputStream input, CachingCallBack callback, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
        FetchedInput fetchedInput = null;
        InputAttemptIdentifier srcAttemptId = null;
        long decompressedLength = 0L;
        long compressedLength = 0L;
        try {
            long startTime = System.currentTimeMillis();
            int partitionCount = 1;
            if (this.compositeFetch) {
                partitionCount = WritableUtils.readVInt((DataInput)input);
            }
            ArrayList<MapOutputStat> mapOutputStats = new ArrayList<MapOutputStat>(partitionCount);
            for (int mapOutputIndex = 0; mapOutputIndex < partitionCount; ++mapOutputIndex) {
                MapOutputStat mapOutputStat = null;
                int responsePartition = -1;
                String pathComponent = null;
                try {
                    ShuffleHeader header = new ShuffleHeader();
                    header.readFields(input);
                    pathComponent = header.getMapId();
                    if (!pathComponent.startsWith("attempt")) {
                        if (pathComponent.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
                            LOG.warn("Invalid map id: " + header.getMapId() + ", expected to start with attempt, partition: " + header.getPartition() + " while fetching " + inputAttemptIdentifier);
                            return new InputAttemptFetchFailure[]{InputAttemptFetchFailure.fromDiskErrorAtSource(inputAttemptIdentifier)};
                        }
                        throw new IllegalArgumentException("Invalid map id: " + header.getMapId() + ", expected to start with attempt, partition: " + header.getPartition() + " while fetching " + inputAttemptIdentifier);
                    }
                    srcAttemptId = this.pathToAttemptMap.get(new PathPartition(pathComponent, header.getPartition()));
                    if (srcAttemptId == null) {
                        throw new IllegalArgumentException("Source attempt not found for map id: " + header.getMapId() + ", partition: " + header.getPartition() + " while fetching " + inputAttemptIdentifier);
                    }
                    if (header.getCompressedLength() == 0L) continue;
                    mapOutputStat = new MapOutputStat(srcAttemptId, header.getUncompressedLength(), header.getCompressedLength(), header.getPartition());
                    mapOutputStats.add(mapOutputStat);
                    responsePartition = header.getPartition();
                }
                catch (IllegalArgumentException e) {
                    if (!this.isShutDown.get()) {
                        LOG.warn("Invalid src id ", (Throwable)e);
                        return InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values());
                    }
                    if (this.isDebugEnabled) {
                        LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
                    }
                    return null;
                }
                if (!this.verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, responsePartition, mapOutputStat.srcAttemptId, pathComponent)) {
                    if (!this.isShutDown.get()) {
                        srcAttemptId = mapOutputStat.srcAttemptId;
                        if (srcAttemptId == null) {
                            LOG.warn("Was expecting " + this.getNextRemainingAttempt() + " but got null");
                            srcAttemptId = this.getNextRemainingAttempt();
                        }
                        assert (srcAttemptId != null);
                        return new InputAttemptFetchFailure[]{InputAttemptFetchFailure.fromAttempt(srcAttemptId)};
                    }
                    if (this.isDebugEnabled) {
                        LOG.debug("Already shutdown. Ignoring verification failure.");
                    }
                    return null;
                }
                if (!this.isDebugEnabled) continue;
                LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength + ", decomp len: " + mapOutputStat.decompressedLength);
            }
            for (MapOutputStat mapOutputStat : mapOutputStats) {
                srcAttemptId = mapOutputStat.srcAttemptId;
                decompressedLength = mapOutputStat.decompressedLength;
                compressedLength = mapOutputStat.compressedLength;
                fetchedInput = srcAttemptId.isShared() && callback != null ? this.inputManager.allocateType(FetchedInput.Type.DISK, decompressedLength, compressedLength, srcAttemptId) : this.inputManager.allocate(decompressedLength, compressedLength, srcAttemptId);
                if (this.isDebugEnabled) {
                    LOG.debug("fetcher about to shuffle output of srcAttempt " + fetchedInput.getInputAttemptIdentifier() + " decomp: " + decompressedLength + " len: " + compressedLength + " to " + fetchedInput.getType());
                }
                if (fetchedInput.getType() == FetchedInput.Type.MEMORY) {
                    ShuffleUtils.shuffleToMemory(((MemoryFetchedInput)fetchedInput).getBytes(), input, (int)decompressedLength, (int)compressedLength, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, fetchedInput.getInputAttemptIdentifier());
                } else if (fetchedInput.getType() == FetchedInput.Type.DISK) {
                    ShuffleUtils.shuffleToDisk(((DiskFetchedInput)fetchedInput).getOutputStream(), this.host + ":" + this.port, input, compressedLength, decompressedLength, LOG, fetchedInput.getInputAttemptIdentifier(), this.ifileReadAhead, this.ifileReadAheadLength, this.verifyDiskChecksum);
                } else {
                    throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + fetchedInput);
                }
                if (srcAttemptId.isShared() && callback != null) {
                    callback.cache(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength);
                }
                long endTime = System.currentTimeMillis();
                this.retryStartTime = 0L;
                this.fetcherCallback.fetchSucceeded(this.host, srcAttemptId, fetchedInput, compressedLength, decompressedLength, endTime - startTime);
            }
            this.srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
        }
        catch (IOException | InternalError ioe) {
            if (this.isShutDown.get()) {
                this.cleanupFetchedInput(fetchedInput);
                if (this.isDebugEnabled) {
                    LOG.debug("Already shutdown. Ignoring exception during fetch " + ioe.getClass().getName() + ", Message: " + ioe.getMessage());
                }
                return null;
            }
            if (this.shouldRetry(srcAttemptId, ioe)) {
                this.cleanupFetchedInput(fetchedInput);
                throw new FetcherReadTimeoutException(ioe);
            }
            if (srcAttemptId == null || fetchedInput == null) {
                LOG.info("fetcher failed to read map header" + srcAttemptId + " decomp: " + decompressedLength + ", " + compressedLength, ioe);
                this.cleanupFetchedInput(fetchedInput);
                if (srcAttemptId == null) {
                    return InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values());
                }
                return new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(srcAttemptId)};
            }
            LOG.warn("Failed to shuffle output of " + srcAttemptId + " from " + this.host + " (to " + this.localHostname + ")", ioe);
            this.cleanupFetchedInput(fetchedInput);
            return new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(srcAttemptId)};
        }
        return null;
    }

    private void cleanupFetchedInput(FetchedInput fetchedInput) {
        if (fetchedInput != null) {
            try {
                fetchedInput.abort();
            }
            catch (IOException e) {
                LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
            }
        }
    }

    private boolean shouldRetry(InputAttemptIdentifier srcAttemptId, Throwable ioe) {
        if (!(ioe instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTime = System.currentTimeMillis();
        if (this.retryStartTime == 0L) {
            this.retryStartTime = currentTime;
        }
        if (currentTime - this.retryStartTime - (long)this.httpConnectionParams.getReadTimeout() < 0L) {
            LOG.warn("Shuffle output from " + srcAttemptId + " failed (to " + this.localHostname + "), retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + this.host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long compressedLength, long decompressedLength, int fetchPartition, InputAttemptIdentifier srcAttemptId, String pathComponent) {
        if (compressedLength < 0L || decompressedLength < 0L) {
            LOG.warn(" invalid lengths in input header -> headerPathComponent: " + pathComponent + ", nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + ", decomp len: " + decompressedLength);
            return false;
        }
        if (fetchPartition < this.partition || fetchPartition >= this.partition + this.partitionCount) {
            LOG.warn(" data for the wrong reduce -> headerPathComponent: " + pathComponent + "nextRemainingSrcAttemptId: " + this.getNextRemainingAttempt() + ", mappedSrcAttemptId: " + srcAttemptId + " len: " + compressedLength + " decomp len: " + decompressedLength + " for reduce " + fetchPartition);
            return false;
        }
        return true;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.srcAttemptsRemaining.size() > 0) {
            return this.srcAttemptsRemaining.values().iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (((Object)((Object)this)).getClass() != obj.getClass()) {
            return false;
        }
        Fetcher other = (Fetcher)((Object)obj);
        return this.fetcherIdentifier == other.fetcherIdentifier;
    }

    public static class PathPartition {
        final String path;
        final int partition;

        PathPartition(String path, int partition) {
            this.path = path;
            this.partition = partition;
        }

        public int hashCode() {
            int prime = 31;
            int result = 1;
            result = 31 * result + (this.path == null ? 0 : this.path.hashCode());
            result = 31 * result + this.partition;
            return result;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (this.getClass() != obj.getClass()) {
                return false;
            }
            PathPartition other = (PathPartition)obj;
            if (this.path == null ? other.path != null : !this.path.equals(other.path)) {
                return false;
            }
            return this.partition == other.partition;
        }

        public String toString() {
            return "PathPartition [path=" + this.path + ", partition=" + this.partition + "]";
        }
    }

    static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptFetchFailure[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptFetchFailure[] failedInputs, boolean connectFailed) {
            this.fetchResult = fetchResult;
            this.failedInputs = failedInputs;
            this.connectFailed = connectFailed;
        }
    }

    private final class CachingCallBack {
        private CachingCallBack() {
        }

        public void cache(String host, InputAttemptIdentifier srcAttemptId, FetchedInput fetchedInput, long compressedLength, long decompressedLength) {
            try {
                TezIndexRecord indexRec;
                Preconditions.checkArgument((Fetcher.this.partition == 0 ? 1 : 0) != 0, (Object)"Partition == 0");
                String tmpSuffix = System.currentTimeMillis() + ".tmp";
                String finalOutput = Fetcher.this.getMapOutputFile(srcAttemptId.getPathComponent());
                Path outputPath = Fetcher.this.localDirAllocator.getLocalPathForWrite(finalOutput, compressedLength, Fetcher.this.conf);
                TezSpillRecord spillRec = new TezSpillRecord(1);
                Path tmpIndex = outputPath.suffix(".index" + tmpSuffix);
                if (Fetcher.this.localFs.exists(tmpIndex)) {
                    LOG.warn("Found duplicate instance of input index file " + tmpIndex);
                    return;
                }
                Path tmpPath = null;
                switch (fetchedInput.getType()) {
                    case DISK: {
                        DiskFetchedInput input = (DiskFetchedInput)fetchedInput;
                        indexRec = new TezIndexRecord(0L, decompressedLength, compressedLength);
                        Fetcher.this.localFs.mkdirs(outputPath.getParent());
                        tmpPath = outputPath.suffix(tmpSuffix);
                        Fetcher.this.localFs.copyFromLocalFile(input.getInputPath(), tmpPath);
                        boolean renamed = Fetcher.this.localFs.rename(tmpPath, outputPath);
                        if (renamed) break;
                        LOG.warn("Could not rename to cached file name " + outputPath);
                        Fetcher.this.localFs.delete(tmpPath, false);
                        return;
                    }
                    default: {
                        LOG.warn("Incorrect use of CachingCallback for " + srcAttemptId);
                        return;
                    }
                }
                spillRec.putIndex(indexRec, 0);
                spillRec.writeToFile(tmpIndex, Fetcher.this.conf, (FileSystem)Fetcher.this.localFs);
                boolean renamed = Fetcher.this.localFs.rename(tmpIndex, outputPath.suffix(".index"));
                if (!renamed) {
                    Fetcher.this.localFs.delete(tmpIndex, false);
                    Fetcher.this.localFs.delete(outputPath, false);
                    LOG.warn("Could not rename the index file to " + outputPath.suffix(".index"));
                    return;
                }
            }
            catch (IOException ioe) {
                LOG.warn("Cache threw an error " + ioe);
            }
        }
    }

    private static class MapOutputStat {
        final InputAttemptIdentifier srcAttemptId;
        final long decompressedLength;
        final long compressedLength;
        final int forReduce;

        MapOutputStat(InputAttemptIdentifier srcAttemptId, long decompressedLength, long compressedLength, int forReduce) {
            this.srcAttemptId = srcAttemptId;
            this.decompressedLength = decompressedLength;
            this.compressedLength = compressedLength;
            this.forReduce = forReduce;
        }

        public String toString() {
            return "id: " + this.srcAttemptId + ", decompressed length: " + this.decompressedLength + ", compressed length: " + this.compressedLength + ", reduce: " + this.forReduce;
        }
    }

    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext, JobTokenSecretManager jobTokenSecretMgr, Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch) {
            this.fetcher = new Fetcher(fetcherCallback, params, inputManager, inputContext, jobTokenSecretMgr, conf, null, null, null, localDiskFetchEnabled, false, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
        }

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams params, FetchedInputAllocator inputManager, InputContext inputContext, JobTokenSecretManager jobTokenSecretMgr, Configuration conf, RawLocalFileSystem localFs, LocalDirAllocator localDirAllocator, Path lockPath, boolean localDiskFetchEnabled, boolean sharedFetchEnabled, String localHostname, int shufflePort, boolean asyncHttp, boolean verifyDiskChecksum, boolean compositeFetch, boolean enableFetcherTestingErrors) {
            this.fetcher = enableFetcherTestingErrors ? new FetcherWithInjectableErrors(fetcherCallback, params, inputManager, inputContext, jobTokenSecretMgr, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch) : new Fetcher(fetcherCallback, params, inputManager, inputContext, jobTokenSecretMgr, conf, localFs, localDirAllocator, lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp, verifyDiskChecksum, compositeFetch);
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
            this.fetcher.httpConnectionParams = httpParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec codec) {
            this.fetcher.codec = codec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean readAhead, int readAheadBytes) {
            this.fetcher.ifileReadAhead = readAhead;
            this.fetcher.ifileReadAheadLength = readAheadBytes;
            return this;
        }

        public FetcherBuilder assignWork(String host, int port, int partition, int partitionCount, List<InputAttemptIdentifier> inputs) {
            this.fetcher.host = host;
            this.fetcher.port = port;
            this.fetcher.partition = partition;
            this.fetcher.partitionCount = partitionCount;
            this.fetcher.srcAttempts = inputs;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState((this.workAssigned ? 1 : 0) != 0, (Object)"Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }
}

