package org.apache.spark.network.shuffle;

import java.io.IOException;
import java.util.Collections;
import java.util.LinkedHashSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.spark.internal.LogKeys$BLOCK_ID$;
import org.apache.spark.internal.LogKeys$MAX_ATTEMPTS$;
import org.apache.spark.internal.LogKeys$NUM_BLOCKS$;
import org.apache.spark.internal.LogKeys$NUM_RETRY$;
import org.apache.spark.internal.LogKeys$RETRY_WAIT_TIME$;
import org.apache.spark.internal.LogKeys$TRANSFER_TYPE$;
import org.apache.spark.internal.MDC;
import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.sasl.SaslTimeoutException;
import org.apache.spark.network.util.NettyUtils;
import org.apache.spark.network.util.TransportConf;
import org.sparkproject.guava.annotations.VisibleForTesting;
import org.sparkproject.guava.base.Preconditions;
import org.sparkproject.guava.collect.Sets;
import org.sparkproject.guava.util.concurrent.Uninterruptibles;

/* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockTransferor.class */
public class RetryingBlockTransferor {
    private static final ExecutorService executorService = Executors.newCachedThreadPool(NettyUtils.createThreadFactory("Block Transfer Retry"));
    private static final SparkLogger logger = SparkLoggerFactory.getLogger(RetryingBlockTransferor.class);
    private final BlockTransferStarter transferStarter;
    private final BlockTransferListener listener;
    private final int maxRetries;
    private final int retryWaitTime;
    private int retryCount;
    private int saslRetryCount;
    private final LinkedHashSet<String> outstandingBlocksIds;
    private RetryingBlockTransferListener currentListener;
    private final boolean enableSaslRetries;
    private final ErrorHandler errorHandler;

    /* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockTransferor$BlockTransferStarter.class */
    public interface BlockTransferStarter {
        void createAndStart(String[] strArr, BlockTransferListener blockTransferListener) throws IOException, InterruptedException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/spark/network/shuffle/RetryingBlockTransferor$RetryingBlockTransferListener.class */
    public class RetryingBlockTransferListener implements BlockFetchingListener, BlockPushingListener {
        RetryingBlockTransferListener() {
        }

        private void handleBlockTransferSuccess(String str, ManagedBuffer managedBuffer) {
            boolean z = false;
            synchronized (RetryingBlockTransferor.this) {
                if (this == RetryingBlockTransferor.this.currentListener && RetryingBlockTransferor.this.outstandingBlocksIds.contains(str)) {
                    RetryingBlockTransferor.this.outstandingBlocksIds.remove(str);
                    z = true;
                    if (RetryingBlockTransferor.this.saslRetryCount > 0) {
                        Preconditions.checkState(RetryingBlockTransferor.this.retryCount >= RetryingBlockTransferor.this.saslRetryCount, "retryCount must be greater than or equal to saslRetryCount");
                        RetryingBlockTransferor.this.retryCount -= RetryingBlockTransferor.this.saslRetryCount;
                        RetryingBlockTransferor.this.saslRetryCount = 0;
                    }
                }
            }
            if (z) {
                RetryingBlockTransferor.this.listener.onBlockTransferSuccess(str, managedBuffer);
            }
        }

        private void handleBlockTransferFailure(String str, Throwable th) {
            boolean z = false;
            synchronized (RetryingBlockTransferor.this) {
                if (this == RetryingBlockTransferor.this.currentListener && RetryingBlockTransferor.this.outstandingBlocksIds.contains(str)) {
                    if (!RetryingBlockTransferor.this.shouldRetry(th)) {
                        if (RetryingBlockTransferor.this.errorHandler.shouldLogError(th)) {
                            RetryingBlockTransferor.logger.error("Failed to {} block {}, and will not retry ({} retries)", th, new MDC[]{MDC.of(LogKeys$TRANSFER_TYPE$.MODULE$, RetryingBlockTransferor.this.listener.getTransferType()), MDC.of(LogKeys$BLOCK_ID$.MODULE$, str), MDC.of(LogKeys$NUM_RETRY$.MODULE$, Integer.valueOf(RetryingBlockTransferor.this.retryCount))});
                        } else {
                            RetryingBlockTransferor.logger.debug(String.format("Failed to %s block %s, and will not retry (%s retries)", RetryingBlockTransferor.this.listener.getTransferType(), str, Integer.valueOf(RetryingBlockTransferor.this.retryCount)), th);
                        }
                        RetryingBlockTransferor.this.outstandingBlocksIds.remove(str);
                        z = true;
                    } else if (!RetryingBlockTransferor.this.initiateRetry(th)) {
                        RetryingBlockTransferor.this.outstandingBlocksIds.remove(str);
                        z = true;
                    }
                }
            }
            if (z) {
                RetryingBlockTransferor.this.listener.onBlockTransferFailure(str, th);
            }
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener
        public void onBlockFetchSuccess(String str, ManagedBuffer managedBuffer) {
            handleBlockTransferSuccess(str, managedBuffer);
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener
        public void onBlockFetchFailure(String str, Throwable th) {
            handleBlockTransferFailure(str, th);
        }

        @Override // org.apache.spark.network.shuffle.BlockPushingListener
        public void onBlockPushSuccess(String str, ManagedBuffer managedBuffer) {
            handleBlockTransferSuccess(str, managedBuffer);
        }

        @Override // org.apache.spark.network.shuffle.BlockPushingListener
        public void onBlockPushFailure(String str, Throwable th) {
            handleBlockTransferFailure(str, th);
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener, org.apache.spark.network.shuffle.BlockTransferListener
        public void onBlockTransferSuccess(String str, ManagedBuffer managedBuffer) {
            throw new RuntimeException("Invocation on RetryingBlockTransferListener.onBlockTransferSuccess is unexpected.");
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener, org.apache.spark.network.shuffle.BlockTransferListener
        public void onBlockTransferFailure(String str, Throwable th) {
            throw new RuntimeException("Invocation on RetryingBlockTransferListener.onBlockTransferFailure is unexpected.");
        }

        @Override // org.apache.spark.network.shuffle.BlockFetchingListener, org.apache.spark.network.shuffle.BlockTransferListener
        public String getTransferType() {
            throw new RuntimeException("Invocation on RetryingBlockTransferListener.getTransferType is unexpected.");
        }
    }

    public RetryingBlockTransferor(TransportConf transportConf, BlockTransferStarter blockTransferStarter, String[] strArr, BlockTransferListener blockTransferListener, ErrorHandler errorHandler) {
        this.retryCount = 0;
        this.saslRetryCount = 0;
        this.transferStarter = blockTransferStarter;
        this.listener = blockTransferListener;
        this.maxRetries = transportConf.maxIORetries();
        this.retryWaitTime = transportConf.ioRetryWaitTimeMs();
        this.outstandingBlocksIds = Sets.newLinkedHashSet();
        Collections.addAll(this.outstandingBlocksIds, strArr);
        this.currentListener = new RetryingBlockTransferListener();
        this.errorHandler = errorHandler;
        this.enableSaslRetries = transportConf.enableSaslRetries();
        this.saslRetryCount = 0;
    }

    public RetryingBlockTransferor(TransportConf transportConf, BlockTransferStarter blockTransferStarter, String[] strArr, BlockFetchingListener blockFetchingListener) {
        this(transportConf, blockTransferStarter, strArr, blockFetchingListener, ErrorHandler.NOOP_ERROR_HANDLER);
    }

    @VisibleForTesting
    synchronized void setCurrentListener(RetryingBlockTransferListener retryingBlockTransferListener) {
        this.currentListener = retryingBlockTransferListener;
    }

    public void start() {
        transferAllOutstanding();
    }

    private void transferAllOutstanding() {
        String[] strArr;
        int i;
        RetryingBlockTransferListener retryingBlockTransferListener;
        synchronized (this) {
            strArr = (String[]) this.outstandingBlocksIds.toArray(new String[this.outstandingBlocksIds.size()]);
            i = this.retryCount;
            retryingBlockTransferListener = this.currentListener;
        }
        try {
            this.transferStarter.createAndStart(strArr, retryingBlockTransferListener);
        } catch (Exception e) {
            if (i > 0) {
                logger.error("Exception while beginning {} of {} outstanding blocks (after {} retries)", e, new MDC[]{MDC.of(LogKeys$TRANSFER_TYPE$.MODULE$, this.listener.getTransferType()), MDC.of(LogKeys$NUM_BLOCKS$.MODULE$, Integer.valueOf(strArr.length)), MDC.of(LogKeys$NUM_RETRY$.MODULE$, Integer.valueOf(i))});
            } else {
                logger.error("Exception while beginning {} of {} outstanding blocks", e, new MDC[]{MDC.of(LogKeys$TRANSFER_TYPE$.MODULE$, this.listener.getTransferType()), MDC.of(LogKeys$NUM_BLOCKS$.MODULE$, Integer.valueOf(strArr.length))});
            }
            if (shouldRetry(e) && initiateRetry(e)) {
                return;
            }
            for (String str : strArr) {
                this.listener.onBlockTransferFailure(str, e);
            }
        }
    }

    @VisibleForTesting
    synchronized boolean initiateRetry(Throwable th) {
        if (this.enableSaslRetries && (th instanceof SaslTimeoutException)) {
            this.saslRetryCount++;
        }
        this.retryCount++;
        this.currentListener = new RetryingBlockTransferListener();
        logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms", new MDC[]{MDC.of(LogKeys$TRANSFER_TYPE$.MODULE$, this.listener.getTransferType()), MDC.of(LogKeys$NUM_RETRY$.MODULE$, Integer.valueOf(this.retryCount)), MDC.of(LogKeys$MAX_ATTEMPTS$.MODULE$, Integer.valueOf(this.maxRetries)), MDC.of(LogKeys$NUM_BLOCKS$.MODULE$, Integer.valueOf(this.outstandingBlocksIds.size())), MDC.of(LogKeys$RETRY_WAIT_TIME$.MODULE$, Integer.valueOf(this.retryWaitTime))});
        try {
            executorService.execute(() -> {
                Uninterruptibles.sleepUninterruptibly(this.retryWaitTime, TimeUnit.MILLISECONDS);
                transferAllOutstanding();
            });
            return true;
        } catch (Throwable th2) {
            logger.error("Exception while trying to initiate retry", th2);
            return false;
        }
    }

    private synchronized boolean shouldRetry(Throwable th) {
        boolean z = (th instanceof IOException) || (th.getCause() instanceof IOException);
        boolean z2 = this.enableSaslRetries && (th instanceof SaslTimeoutException);
        if (!z2 && this.saslRetryCount > 0) {
            Preconditions.checkState(this.retryCount >= this.saslRetryCount, "retryCount must be greater than or equal to saslRetryCount");
            this.retryCount -= this.saslRetryCount;
            this.saslRetryCount = 0;
        }
        return (z2 || z) && (this.retryCount < this.maxRetries) && this.errorHandler.shouldRetryError(th);
    }

    @VisibleForTesting
    public int getRetryCount() {
        return this.retryCount;
    }
}
