package org.apache.tez.runtime.library.common.shuffle;

import com.google.common.annotations.VisibleForTesting;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.SocketTimeoutException;
import java.net.URL;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.http.BaseHttpConnection;
import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher.class */
public class Fetcher extends CallableWithNdc<FetchResult> {
    private static final Logger LOG;
    private static final AtomicInteger fetcherIdGen;
    private final Configuration conf;
    private final int shufflePort;
    private CompressionCodec codec;
    private final JobTokenSecretManager jobTokenSecretMgr;
    private final FetcherCallback fetcherCallback;
    private final FetchedInputAllocator inputManager;
    private final ApplicationId appId;
    private final int dagIdentifier;
    private final String logIdentifier;
    private final String localHostname;
    private List<InputAttemptIdentifier> srcAttempts;

    @VisibleForTesting
    Map<String, InputAttemptIdentifier> srcAttemptsRemaining;
    private String host;
    private int port;
    private int partition;
    private int partitionCount;
    private URL url;
    private volatile DataInputStream input;
    BaseHttpConnection httpConnection;
    private HttpConnectionParams httpConnectionParams;
    private final boolean localDiskFetchEnabled;
    private final boolean sharedFetchEnabled;
    private final LocalDirAllocator localDirAllocator;
    private final Path lockPath;
    private final RawLocalFileSystem localFs;
    private final boolean asyncHttp;
    private final boolean compositeFetch;
    private final boolean verifyDiskChecksum;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean ifileReadAhead = true;
    private int ifileReadAheadLength = TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES_DEFAULT;
    private final AtomicBoolean isShutDown = new AtomicBoolean(false);
    private long retryStartTime = 0;
    private final boolean isDebugEnabled = LOG.isDebugEnabled();
    private final Map<PathPartition, InputAttemptIdentifier> pathToAttemptMap = new HashMap();
    protected final int fetcherIdentifier = fetcherIdGen.getAndIncrement();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$CachingCallBack.class */
    public final class CachingCallBack {
        private CachingCallBack() {
        }

        public void cache(String str, InputAttemptIdentifier inputAttemptIdentifier, FetchedInput fetchedInput, long j, long j2) {
            try {
                Preconditions.checkArgument(Fetcher.this.partition == 0, "Partition == 0");
                String str2 = TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT + System.currentTimeMillis() + ".tmp";
                Path localPathForWrite = Fetcher.this.localDirAllocator.getLocalPathForWrite(Fetcher.this.getMapOutputFile(inputAttemptIdentifier.getPathComponent()), j, Fetcher.this.conf);
                TezSpillRecord tezSpillRecord = new TezSpillRecord(1);
                Path suffix = localPathForWrite.suffix(".index" + str2);
                if (Fetcher.this.localFs.exists(suffix)) {
                    Fetcher.LOG.warn("Found duplicate instance of input index file " + suffix);
                    return;
                }
                switch (fetchedInput.getType()) {
                    case DISK:
                        TezIndexRecord tezIndexRecord = new TezIndexRecord(0L, j2, j);
                        Fetcher.this.localFs.mkdirs(localPathForWrite.getParent());
                        Path suffix2 = localPathForWrite.suffix(str2);
                        Fetcher.this.localFs.copyFromLocalFile(((DiskFetchedInput) fetchedInput).getInputPath(), suffix2);
                        if (!Fetcher.this.localFs.rename(suffix2, localPathForWrite)) {
                            Fetcher.LOG.warn("Could not rename to cached file name " + localPathForWrite);
                            Fetcher.this.localFs.delete(suffix2, false);
                            return;
                        }
                        tezSpillRecord.putIndex(tezIndexRecord, 0);
                        tezSpillRecord.writeToFile(suffix, Fetcher.this.conf, Fetcher.this.localFs);
                        if (Fetcher.this.localFs.rename(suffix, localPathForWrite.suffix(".index"))) {
                            return;
                        }
                        Fetcher.this.localFs.delete(suffix, false);
                        Fetcher.this.localFs.delete(localPathForWrite, false);
                        Fetcher.LOG.warn("Could not rename the index file to " + localPathForWrite.suffix(".index"));
                        return;
                    default:
                        Fetcher.LOG.warn("Incorrect use of CachingCallback for " + inputAttemptIdentifier);
                        return;
                }
            } catch (IOException e) {
                Fetcher.LOG.warn("Cache threw an error " + e);
            }
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$FetcherBuilder.class */
    public static class FetcherBuilder {
        private Fetcher fetcher;
        private boolean workAssigned = false;

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, InputContext inputContext, JobTokenSecretManager jobTokenSecretManager, Configuration configuration, boolean z, String str, int i, boolean z2, boolean z3, boolean z4) {
            this.fetcher = new Fetcher(fetcherCallback, httpConnectionParams, fetchedInputAllocator, inputContext, jobTokenSecretManager, configuration, null, null, null, z, false, str, i, z2, z3, z4);
        }

        public FetcherBuilder(FetcherCallback fetcherCallback, HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, InputContext inputContext, JobTokenSecretManager jobTokenSecretManager, Configuration configuration, RawLocalFileSystem rawLocalFileSystem, LocalDirAllocator localDirAllocator, Path path, boolean z, boolean z2, String str, int i, boolean z3, boolean z4, boolean z5, boolean z6) {
            if (z6) {
                this.fetcher = new FetcherWithInjectableErrors(fetcherCallback, httpConnectionParams, fetchedInputAllocator, inputContext, jobTokenSecretManager, configuration, rawLocalFileSystem, localDirAllocator, path, z, z2, str, i, z3, z4, z5);
            } else {
                this.fetcher = new Fetcher(fetcherCallback, httpConnectionParams, fetchedInputAllocator, inputContext, jobTokenSecretManager, configuration, rawLocalFileSystem, localDirAllocator, path, z, z2, str, i, z3, z4, z5);
            }
        }

        public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpConnectionParams) {
            this.fetcher.httpConnectionParams = httpConnectionParams;
            return this;
        }

        public FetcherBuilder setCompressionParameters(CompressionCodec compressionCodec) {
            this.fetcher.codec = compressionCodec;
            return this;
        }

        public FetcherBuilder setIFileParams(boolean z, int i) {
            this.fetcher.ifileReadAhead = z;
            this.fetcher.ifileReadAheadLength = i;
            return this;
        }

        public FetcherBuilder assignWork(String str, int i, int i2, int i3, List<InputAttemptIdentifier> list) {
            this.fetcher.host = str;
            this.fetcher.port = i;
            this.fetcher.partition = i2;
            this.fetcher.partitionCount = i3;
            this.fetcher.srcAttempts = list;
            this.workAssigned = true;
            return this;
        }

        public Fetcher build() {
            Preconditions.checkState(this.workAssigned, "Cannot build a fetcher withot assigning work to it");
            return this.fetcher;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$HostFetchResult.class */
    public static class HostFetchResult {
        private final FetchResult fetchResult;
        private final InputAttemptFetchFailure[] failedInputs;
        private final boolean connectFailed;

        public HostFetchResult(FetchResult fetchResult, InputAttemptFetchFailure[] inputAttemptFetchFailureArr, boolean z) {
            this.fetchResult = fetchResult;
            this.failedInputs = inputAttemptFetchFailureArr;
            this.connectFailed = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$MapOutputStat.class */
    public static class MapOutputStat {
        final InputAttemptIdentifier srcAttemptId;
        final long decompressedLength;
        final long compressedLength;
        final int forReduce;

        MapOutputStat(InputAttemptIdentifier inputAttemptIdentifier, long j, long j2, int i) {
            this.srcAttemptId = inputAttemptIdentifier;
            this.decompressedLength = j;
            this.compressedLength = j2;
            this.forReduce = i;
        }

        public String toString() {
            return "id: " + this.srcAttemptId + ", decompressed length: " + this.decompressedLength + ", compressed length: " + this.compressedLength + ", reduce: " + this.forReduce;
        }
    }

    /* loaded from: input_file:org/apache/tez/runtime/library/common/shuffle/Fetcher$PathPartition.class */
    public static class PathPartition {
        final String path;
        final int partition;

        PathPartition(String str, int i) {
            this.path = str;
            this.partition = i;
        }

        public int hashCode() {
            return (31 * ((31 * 1) + (this.path == null ? 0 : this.path.hashCode()))) + this.partition;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PathPartition pathPartition = (PathPartition) obj;
            if (this.path == null) {
                if (pathPartition.path != null) {
                    return false;
                }
            } else if (!this.path.equals(pathPartition.path)) {
                return false;
            }
            return this.partition == pathPartition.partition;
        }

        public String toString() {
            return "PathPartition [path=" + this.path + ", partition=" + this.partition + "]";
        }
    }

    @VisibleForTesting
    public List<InputAttemptIdentifier> getSrcAttempts() {
        return this.srcAttempts;
    }

    @VisibleForTesting
    public String getHost() {
        return this.host;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams httpConnectionParams, FetchedInputAllocator fetchedInputAllocator, InputContext inputContext, JobTokenSecretManager jobTokenSecretManager, Configuration configuration, RawLocalFileSystem rawLocalFileSystem, LocalDirAllocator localDirAllocator, Path path, boolean z, boolean z2, String str, int i, boolean z3, boolean z4, boolean z5) {
        this.asyncHttp = z3;
        this.verifyDiskChecksum = z4;
        this.fetcherCallback = fetcherCallback;
        this.inputManager = fetchedInputAllocator;
        this.jobTokenSecretMgr = jobTokenSecretManager;
        this.appId = inputContext.getApplicationId();
        this.dagIdentifier = inputContext.getDagIdentifier();
        this.httpConnectionParams = httpConnectionParams;
        this.conf = configuration;
        this.localDiskFetchEnabled = z;
        this.sharedFetchEnabled = z2;
        this.logIdentifier = " fetcher [" + (TezUtilsInternal.cleanVertexName(inputContext.getSourceVertexName()) + " -> " + TezUtilsInternal.cleanVertexName(inputContext.getTaskVertexName())) + "] " + this.fetcherIdentifier;
        this.localFs = rawLocalFileSystem;
        this.localDirAllocator = localDirAllocator;
        this.lockPath = path;
        this.localHostname = str;
        this.shufflePort = i;
        this.compositeFetch = z5;
        try {
            if (this.sharedFetchEnabled) {
                this.localFs.mkdirs(this.lockPath);
            }
        } catch (Exception e) {
            LOG.warn("Error initializing local dirs for shared transfer " + e);
        }
    }

    void populateRemainingMap(List<InputAttemptIdentifier> list) {
        if (this.srcAttemptsRemaining == null) {
            this.srcAttemptsRemaining = new LinkedHashMap(list.size());
        }
        for (InputAttemptIdentifier inputAttemptIdentifier : list) {
            this.srcAttemptsRemaining.put(inputAttemptIdentifier.toString(), inputAttemptIdentifier);
        }
    }

    /* renamed from: callInternal, reason: merged with bridge method [inline-methods] */
    public FetchResult m217callInternal() throws Exception {
        boolean z = this.sharedFetchEnabled && this.localDiskFetchEnabled;
        if (this.srcAttempts.size() == 0) {
            return new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttempts);
        }
        populateRemainingMap(this.srcAttempts);
        for (InputAttemptIdentifier inputAttemptIdentifier : this.srcAttemptsRemaining.values()) {
            if (inputAttemptIdentifier instanceof CompositeInputAttemptIdentifier) {
                CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = (CompositeInputAttemptIdentifier) inputAttemptIdentifier;
                for (int i = 0; i < compositeInputAttemptIdentifier.getInputIdentifierCount(); i++) {
                    this.pathToAttemptMap.put(new PathPartition(compositeInputAttemptIdentifier.getPathComponent(), this.partition + i), compositeInputAttemptIdentifier.expand(i));
                }
            } else {
                this.pathToAttemptMap.put(new PathPartition(inputAttemptIdentifier.getPathComponent(), 0), inputAttemptIdentifier);
            }
            z &= inputAttemptIdentifier.isShared();
        }
        if (z) {
            Preconditions.checkArgument(this.partition == 0, "Shared fetches cannot be done for partitioned input- partition is non-zero (%d)", new Object[]{Integer.valueOf(this.partition)});
        }
        HostFetchResult doSharedFetch = this.localDiskFetchEnabled && this.host.equals(this.localHostname) && this.port == this.shufflePort ? setupLocalDiskFetch() : z ? doSharedFetch() : doHttpFetch();
        if (doSharedFetch.failedInputs != null && doSharedFetch.failedInputs.length > 0) {
            if (!this.isShutDown.get()) {
                LOG.warn("copyInputs failed for tasks " + Arrays.toString(doSharedFetch.failedInputs));
                for (InputAttemptFetchFailure inputAttemptFetchFailure : doSharedFetch.failedInputs) {
                    this.fetcherCallback.fetchFailed(this.host, inputAttemptFetchFailure, doSharedFetch.connectFailed);
                }
            } else if (this.isDebugEnabled) {
                LOG.debug("Ignoring failed fetch reports for " + doSharedFetch.failedInputs.length + " inputs since the fetcher has already been stopped");
            }
        }
        shutdown();
        if (doSharedFetch.failedInputs != null || this.srcAttemptsRemaining.isEmpty() || z) {
            return doSharedFetch.fetchResult;
        }
        throw new IOException("server didn't return all expected map outputs: " + this.srcAttemptsRemaining.size() + " left.");
    }

    private int findInputs() throws IOException {
        int i = 0;
        Iterator<InputAttemptIdentifier> it = this.srcAttemptsRemaining.values().iterator();
        while (it.hasNext()) {
            try {
                if (getShuffleInputFileName(it.next().getPathComponent(), ".index") != null) {
                    i++;
                }
            } catch (DiskChecker.DiskErrorException e) {
            }
        }
        return i;
    }

    private FileLock getLock() throws OverlappingFileLockException, InterruptedException, IOException {
        File pathToFile = this.localFs.pathToFile(new Path(this.lockPath, this.host + ".lock"));
        if (!pathToFile.createNewFile() && !pathToFile.exists()) {
            return null;
        }
        FileChannel channel = new RandomAccessFile(pathToFile, "rws").getChannel();
        FileLock tryLock = channel.tryLock(0L, Long.MAX_VALUE, false);
        if (tryLock != null) {
            return tryLock;
        }
        channel.close();
        return null;
    }

    private void releaseLock(FileLock fileLock) throws IOException {
        if (fileLock == null || !fileLock.isValid()) {
            return;
        }
        FileChannel channel = fileLock.channel();
        fileLock.release();
        channel.close();
    }

    /* JADX WARN: Removed duplicated region for block: B:40:0x013c  */
    /* JADX WARN: Removed duplicated region for block: B:42:0x0166  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected org.apache.tez.runtime.library.common.shuffle.Fetcher.HostFetchResult doSharedFetch() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 363
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.tez.runtime.library.common.shuffle.Fetcher.doSharedFetch():org.apache.tez.runtime.library.common.shuffle.Fetcher$HostFetchResult");
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch() {
        return doHttpFetch(null);
    }

    private HostFetchResult setupConnection(Collection<InputAttemptIdentifier> collection) {
        try {
            StringBuilder constructBaseURIForShuffleHandler = ShuffleUtils.constructBaseURIForShuffleHandler(this.host, this.port, this.partition, this.partitionCount, this.appId.toString(), this.dagIdentifier, this.httpConnectionParams.isSslShuffle());
            this.url = ShuffleUtils.constructInputURL(constructBaseURIForShuffleHandler.toString(), collection, this.httpConnectionParams.isKeepAlive());
            this.httpConnection = ShuffleUtils.getHttpConnection(this.asyncHttp, this.url, this.httpConnectionParams, this.logIdentifier, this.jobTokenSecretMgr);
            this.httpConnection.connect();
            if (this.isShutDown.get()) {
                shutdownInternal();
                if (this.isDebugEnabled) {
                    LOG.debug("Detected fetcher has been shutdown after connection establishment. Returning");
                }
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
            }
            try {
                setupConnectionInternal(this.host, collection);
                return null;
            } catch (IOException e) {
                if (!this.isShutDown.get()) {
                    InputAttemptIdentifier next = collection.iterator().next();
                    LOG.warn("FETCH_FAILURE: Fetch Failure while connecting from {} to: {}:{}, attempt: {}, url: {} Informing ShuffleManager", new Object[]{this.localHostname, this.host, Integer.valueOf(this.port), next, constructBaseURIForShuffleHandler, e});
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(next)}, true);
                }
                if (!this.isDebugEnabled) {
                    return null;
                }
                LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e.getClass().getName() + ", Message: " + e.getMessage());
                return null;
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (IOException | InterruptedException e3) {
            if (e3 instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            InputAttemptFetchFailure[] inputAttemptFetchFailureArr = null;
            if (!this.isShutDown.get()) {
                inputAttemptFetchFailureArr = InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values());
            } else if (this.isDebugEnabled) {
                LOG.debug("Not reporting fetch failure during connection establishment, since an Exception was caught after shutdown." + e3.getClass().getName() + ", Message: " + e3.getMessage());
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), inputAttemptFetchFailureArr, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setupConnectionInternal(String str, Collection<InputAttemptIdentifier> collection) throws IOException, InterruptedException {
        this.input = this.httpConnection.getInputStream();
        this.httpConnection.validate();
    }

    @VisibleForTesting
    protected HostFetchResult doHttpFetch(CachingCallBack cachingCallBack) {
        HostFetchResult hostFetchResult = setupConnection(this.srcAttemptsRemaining.values());
        if (hostFetchResult != null) {
            return hostFetchResult;
        }
        if (this.isShutDown.get()) {
            shutdownInternal();
            if (this.isDebugEnabled) {
                LOG.debug("Detected fetcher has been shutdown after opening stream. Returning");
            }
            return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
        }
        InputAttemptFetchFailure[] inputAttemptFetchFailureArr = null;
        while (!this.srcAttemptsRemaining.isEmpty() && inputAttemptFetchFailureArr == null) {
            InputAttemptIdentifier value = this.srcAttemptsRemaining.entrySet().iterator().next().getValue();
            if (this.isShutDown.get()) {
                shutdownInternal(true);
                if (this.isDebugEnabled) {
                    LOG.debug("Fetcher already shutdown. Aborting queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                }
                return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
            }
            try {
                inputAttemptFetchFailureArr = fetchInputs(this.input, cachingCallBack, value);
            } catch (FetcherReadTimeoutException e) {
                shutdownInternal(true);
                if (this.isShutDown.get()) {
                    if (this.isDebugEnabled) {
                        LOG.debug("Fetcher already shutdown. Aborting reconnection and queued fetches for " + this.srcAttemptsRemaining.size() + " inputs");
                    }
                    return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), null, false);
                }
                if (setupConnection(this.srcAttemptsRemaining.values()) != null) {
                    break;
                }
            }
        }
        if (this.isShutDown.get() && inputAttemptFetchFailureArr != null && inputAttemptFetchFailureArr.length > 0) {
            if (this.isDebugEnabled) {
                LOG.debug("Fetcher already shutdown. Not reporting fetch failures for: " + inputAttemptFetchFailureArr.length + " failed inputs");
            }
            inputAttemptFetchFailureArr = null;
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), inputAttemptFetchFailureArr, false);
    }

    @VisibleForTesting
    protected HostFetchResult setupLocalDiskFetch() {
        return doLocalDiskFetch(true);
    }

    @VisibleForTesting
    private HostFetchResult doLocalDiskFetch(boolean z) {
        Iterator<Map.Entry<String, InputAttemptIdentifier>> it = this.srcAttemptsRemaining.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            boolean z2 = false;
            if (!this.isShutDown.get()) {
                InputAttemptIdentifier value = it.next().getValue();
                int i = 0;
                while (true) {
                    if (i >= this.partitionCount) {
                        break;
                    }
                    int i2 = i + this.partition;
                    value = this.pathToAttemptMap.get(new PathPartition(value.getPathComponent(), i2));
                    long currentTimeMillis = System.currentTimeMillis();
                    LocalDiskFetchedInput localDiskFetchedInput = null;
                    try {
                        TezIndexRecord tezIndexRecord = getTezIndexRecord(value, i2);
                        localDiskFetchedInput = new LocalDiskFetchedInput(tezIndexRecord.getStartOffset(), tezIndexRecord.getPartLength(), value, getShuffleInputFileName(value.getPathComponent(), null), this.conf, new FetchedInputCallback() { // from class: org.apache.tez.runtime.library.common.shuffle.Fetcher.1
                            @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                            public void fetchComplete(FetchedInput fetchedInput) {
                            }

                            @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                            public void fetchFailed(FetchedInput fetchedInput) {
                            }

                            @Override // org.apache.tez.runtime.library.common.shuffle.FetchedInputCallback
                            public void freeResources(FetchedInput fetchedInput) {
                            }
                        });
                        if (this.isDebugEnabled) {
                            LOG.debug("fetcher about to shuffle output of srcAttempt (direct disk)" + value + " decomp: " + tezIndexRecord.getRawLength() + " len: " + tezIndexRecord.getPartLength() + " to " + localDiskFetchedInput.getType());
                        }
                        this.fetcherCallback.fetchSucceeded(this.host, value, localDiskFetchedInput, tezIndexRecord.getPartLength(), tezIndexRecord.getRawLength(), System.currentTimeMillis() - currentTimeMillis);
                    } catch (IOException | InternalError e) {
                        z2 = true;
                        cleanupFetchedInput(localDiskFetchedInput);
                        if (this.isShutDown.get()) {
                            if (this.isDebugEnabled) {
                                LOG.debug("Already shutdown. Ignoring Local Fetch Failure for " + value + " from host " + this.host + " : " + e.getClass().getName() + ", message=" + e.getMessage());
                            }
                        } else if (z) {
                            LOG.warn("Failed to shuffle output of " + value + " from " + this.host + "(local fetch)", e);
                        }
                    }
                    i++;
                }
                if (!z2) {
                    it.remove();
                }
            } else if (this.isDebugEnabled) {
                LOG.debug("Already shutdown. Skipping fetch for " + this.srcAttemptsRemaining.size() + " inputs");
            }
        }
        InputAttemptFetchFailure[] inputAttemptFetchFailureArr = null;
        if (z && this.srcAttemptsRemaining.size() > 0) {
            if (!this.isShutDown.get()) {
                inputAttemptFetchFailureArr = InputAttemptFetchFailure.fromAttemptsLocalFetchFailure(this.srcAttemptsRemaining.values());
            } else if (this.isDebugEnabled) {
                LOG.debug("Already shutdown, not reporting fetch failures for: " + this.srcAttemptsRemaining.size() + " remaining inputs");
            }
        }
        return new HostFetchResult(new FetchResult(this.host, this.port, this.partition, this.partitionCount, this.srcAttemptsRemaining.values()), inputAttemptFetchFailureArr, false);
    }

    @VisibleForTesting
    protected TezIndexRecord getTezIndexRecord(InputAttemptIdentifier inputAttemptIdentifier, int i) throws IOException {
        return new TezSpillRecord(getShuffleInputFileName(inputAttemptIdentifier.getPathComponent(), ".index"), (FileSystem) this.localFs).getIndex(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public final String getMapOutputFile(String str) {
        String str2 = "output/" + str + "/file.out";
        return ShuffleUtils.isTezShuffleHandler(this.conf) ? Constants.DAG_PREFIX + this.dagIdentifier + "/" + str2 : str2;
    }

    @VisibleForTesting
    protected Path getShuffleInputFileName(String str, String str2) throws IOException {
        return this.localDirAllocator.getLocalPathToRead(getMapOutputFile(str) + (str2 != null ? str2 : TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT), this.conf);
    }

    @VisibleForTesting
    public Map<PathPartition, InputAttemptIdentifier> getPathToAttemptMap() {
        return this.pathToAttemptMap;
    }

    public void shutdown() {
        if (this.isShutDown.getAndSet(true)) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Shutting down fetcher for host: " + this.host);
        }
        shutdownInternal();
    }

    private void shutdownInternal() {
        shutdownInternal(false);
    }

    private void shutdownInternal(boolean z) {
        synchronized (this.isShutDown) {
            try {
                if (this.httpConnection != null) {
                    this.httpConnection.cleanup(z);
                }
            } catch (IOException e) {
                LOG.info("Exception while shutting down fetcher on " + this.logIdentifier + " : " + e.getMessage());
                if (this.isDebugEnabled) {
                    LOG.debug(TezRuntimeFrameworkConfigs.TEZ_RUNTIME_METRICS_SESSION_ID_DEFAULT, e);
                }
            }
        }
    }

    @VisibleForTesting
    InputAttemptFetchFailure[] fetchInputs(DataInputStream dataInputStream, CachingCallBack cachingCallBack, InputAttemptIdentifier inputAttemptIdentifier) throws FetcherReadTimeoutException {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            int readVInt = this.compositeFetch ? WritableUtils.readVInt(dataInputStream) : 1;
            ArrayList arrayList = new ArrayList(readVInt);
            for (int i = 0; i < readVInt; i++) {
                try {
                    ShuffleHeader shuffleHeader = new ShuffleHeader();
                    shuffleHeader.readFields(dataInputStream);
                    String mapId = shuffleHeader.getMapId();
                    if (!mapId.startsWith(InputAttemptIdentifier.PATH_PREFIX)) {
                        if (!mapId.startsWith(ShuffleHandlerError.DISK_ERROR_EXCEPTION.toString())) {
                            throw new IllegalArgumentException("Invalid map id: " + shuffleHeader.getMapId() + ", expected to start with " + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + shuffleHeader.getPartition() + " while fetching " + inputAttemptIdentifier);
                        }
                        LOG.warn("Invalid map id: " + shuffleHeader.getMapId() + ", expected to start with " + InputAttemptIdentifier.PATH_PREFIX + ", partition: " + shuffleHeader.getPartition() + " while fetching " + inputAttemptIdentifier);
                        return new InputAttemptFetchFailure[]{InputAttemptFetchFailure.fromDiskErrorAtSource(inputAttemptIdentifier)};
                    }
                    InputAttemptIdentifier inputAttemptIdentifier2 = this.pathToAttemptMap.get(new PathPartition(mapId, shuffleHeader.getPartition()));
                    if (inputAttemptIdentifier2 == null) {
                        throw new IllegalArgumentException("Source attempt not found for map id: " + shuffleHeader.getMapId() + ", partition: " + shuffleHeader.getPartition() + " while fetching " + inputAttemptIdentifier);
                    }
                    if (shuffleHeader.getCompressedLength() != 0) {
                        MapOutputStat mapOutputStat = new MapOutputStat(inputAttemptIdentifier2, shuffleHeader.getUncompressedLength(), shuffleHeader.getCompressedLength(), shuffleHeader.getPartition());
                        arrayList.add(mapOutputStat);
                        if (!verifySanity(mapOutputStat.compressedLength, mapOutputStat.decompressedLength, shuffleHeader.getPartition(), mapOutputStat.srcAttemptId, mapId)) {
                            if (this.isShutDown.get()) {
                                if (!this.isDebugEnabled) {
                                    return null;
                                }
                                LOG.debug("Already shutdown. Ignoring verification failure.");
                                return null;
                            }
                            InputAttemptIdentifier inputAttemptIdentifier3 = mapOutputStat.srcAttemptId;
                            if (inputAttemptIdentifier3 == null) {
                                LOG.warn("Was expecting " + getNextRemainingAttempt() + " but got null");
                                inputAttemptIdentifier3 = getNextRemainingAttempt();
                            }
                            if ($assertionsDisabled || inputAttemptIdentifier3 != null) {
                                return new InputAttemptFetchFailure[]{InputAttemptFetchFailure.fromAttempt(inputAttemptIdentifier3)};
                            }
                            throw new AssertionError();
                        }
                        if (this.isDebugEnabled) {
                            LOG.debug("header: " + mapOutputStat.srcAttemptId + ", len: " + mapOutputStat.compressedLength + ", decomp len: " + mapOutputStat.decompressedLength);
                        }
                    }
                } catch (IllegalArgumentException e) {
                    if (!this.isShutDown.get()) {
                        LOG.warn("Invalid src id ", e);
                        return InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values());
                    }
                    if (!this.isDebugEnabled) {
                        return null;
                    }
                    LOG.debug("Already shutdown. Ignoring badId error with message: " + e.getMessage());
                    return null;
                }
            }
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                MapOutputStat mapOutputStat2 = (MapOutputStat) it.next();
                InputAttemptIdentifier inputAttemptIdentifier4 = mapOutputStat2.srcAttemptId;
                long j = mapOutputStat2.decompressedLength;
                long j2 = mapOutputStat2.compressedLength;
                FetchedInput allocate = (!inputAttemptIdentifier4.isShared() || cachingCallBack == null) ? this.inputManager.allocate(j, j2, inputAttemptIdentifier4) : this.inputManager.allocateType(FetchedInput.Type.DISK, j, j2, inputAttemptIdentifier4);
                if (this.isDebugEnabled) {
                    LOG.debug("fetcher about to shuffle output of srcAttempt " + allocate.getInputAttemptIdentifier() + " decomp: " + j + " len: " + j2 + " to " + allocate.getType());
                }
                if (allocate.getType() == FetchedInput.Type.MEMORY) {
                    ShuffleUtils.shuffleToMemory(((MemoryFetchedInput) allocate).getBytes(), dataInputStream, (int) j, (int) j2, this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, allocate.getInputAttemptIdentifier());
                } else {
                    if (allocate.getType() != FetchedInput.Type.DISK) {
                        throw new TezUncheckedException("Bad fetchedInput type while fetching shuffle data " + allocate);
                    }
                    ShuffleUtils.shuffleToDisk(((DiskFetchedInput) allocate).getOutputStream(), this.host + ":" + this.port, dataInputStream, j2, j, LOG, allocate.getInputAttemptIdentifier(), this.ifileReadAhead, this.ifileReadAheadLength, this.verifyDiskChecksum);
                }
                if (inputAttemptIdentifier4.isShared() && cachingCallBack != null) {
                    cachingCallBack.cache(this.host, inputAttemptIdentifier4, allocate, j2, j);
                }
                long currentTimeMillis2 = System.currentTimeMillis();
                this.retryStartTime = 0L;
                this.fetcherCallback.fetchSucceeded(this.host, inputAttemptIdentifier4, allocate, j2, j, currentTimeMillis2 - currentTimeMillis);
            }
            this.srcAttemptsRemaining.remove(inputAttemptIdentifier.toString());
            return null;
        } catch (IOException | InternalError e2) {
            if (this.isShutDown.get()) {
                cleanupFetchedInput(null);
                if (!this.isDebugEnabled) {
                    return null;
                }
                LOG.debug("Already shutdown. Ignoring exception during fetch " + e2.getClass().getName() + ", Message: " + e2.getMessage());
                return null;
            }
            if (shouldRetry(null, e2)) {
                cleanupFetchedInput(null);
                throw new FetcherReadTimeoutException(e2);
            }
            if (0 == 0 || 0 == 0) {
                LOG.info("fetcher failed to read map header" + ((Object) null) + " decomp: 0, 0", e2);
                cleanupFetchedInput(null);
                return 0 == 0 ? InputAttemptFetchFailure.fromAttempts(this.srcAttemptsRemaining.values()) : new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(null)};
            }
            LOG.warn("Failed to shuffle output of " + ((Object) null) + " from " + this.host + " (to " + this.localHostname + ")", e2);
            cleanupFetchedInput(null);
            return new InputAttemptFetchFailure[]{new InputAttemptFetchFailure(null)};
        }
    }

    private void cleanupFetchedInput(FetchedInput fetchedInput) {
        if (fetchedInput != null) {
            try {
                fetchedInput.abort();
            } catch (IOException e) {
                LOG.info("Failure to cleanup fetchedInput: " + fetchedInput);
            }
        }
    }

    private boolean shouldRetry(InputAttemptIdentifier inputAttemptIdentifier, Throwable th) {
        if (!(th instanceof SocketTimeoutException)) {
            return false;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (this.retryStartTime == 0) {
            this.retryStartTime = currentTimeMillis;
        }
        if ((currentTimeMillis - this.retryStartTime) - this.httpConnectionParams.getReadTimeout() < 0) {
            LOG.warn("Shuffle output from " + inputAttemptIdentifier + " failed (to " + this.localHostname + "), retry it.");
            return true;
        }
        LOG.warn("Timeout for copying MapOutput with retry on host " + this.host + "after " + this.httpConnectionParams.getReadTimeout() + "milliseconds.");
        return false;
    }

    private boolean verifySanity(long j, long j2, int i, InputAttemptIdentifier inputAttemptIdentifier, String str) {
        if (j < 0 || j2 < 0) {
            LOG.warn(" invalid lengths in input header -> headerPathComponent: " + str + ", nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i >= this.partition && i < this.partition + this.partitionCount) {
            return true;
        }
        LOG.warn(" data for the wrong reduce -> headerPathComponent: " + str + "nextRemainingSrcAttemptId: " + getNextRemainingAttempt() + ", mappedSrcAttemptId: " + inputAttemptIdentifier + " len: " + j + " decomp len: " + j2 + " for reduce " + i);
        return false;
    }

    private InputAttemptIdentifier getNextRemainingAttempt() {
        if (this.srcAttemptsRemaining.size() > 0) {
            return this.srcAttemptsRemaining.values().iterator().next();
        }
        return null;
    }

    public int hashCode() {
        return this.fetcherIdentifier;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return obj != null && getClass() == obj.getClass() && this.fetcherIdentifier == ((Fetcher) obj).fetcherIdentifier;
    }

    static {
        $assertionsDisabled = !Fetcher.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(Fetcher.class);
        fetcherIdGen = new AtomicInteger(0);
    }
}
