package org.apache.spark.network.shuffle;

import com.codahale.metrics.MetricSet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.spark.internal.LogKeys$APP_ATTEMPT_ID$;
import org.apache.spark.internal.LogKeys$BLOCK_IDS$;
import org.apache.spark.internal.LogKeys$EXECUTOR_ID$;
import org.apache.spark.internal.LogKeys$HOST$;
import org.apache.spark.internal.LogKeys$PORT$;
import org.apache.spark.internal.MDC;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.client.MergedBlockMetaResponseCallback;
import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.crypto.AuthClientBootstrap;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.shuffle.ErrorHandler;
import org.apache.spark.network.shuffle.RetryingBlockTransferor;
import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
import org.apache.spark.network.shuffle.protocol.BlocksRemoved;
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
import org.apache.spark.network.shuffle.protocol.RemoveBlocks;
import org.apache.spark.network.shuffle.protocol.RemoveShuffleMerge;
import org.apache.spark.network.util.TransportConf;
import org.sparkproject.guava.collect.Lists;

/* loaded from: input_file:org/apache/spark/network/shuffle/ExternalBlockStoreClient.class */
public class ExternalBlockStoreClient extends BlockStoreClient {
    private static final ErrorHandler PUSH_ERROR_HANDLER;
    private final boolean authEnabled;
    private final SecretKeyHolder secretKeyHolder;
    private final long registrationTimeoutMs;
    private int comparableAppAttemptId = -1;
    static final /* synthetic */ boolean $assertionsDisabled;

    public ExternalBlockStoreClient(TransportConf transportConf, SecretKeyHolder secretKeyHolder, boolean z, long j) {
        this.transportConf = transportConf;
        this.secretKeyHolder = secretKeyHolder;
        this.authEnabled = z;
        this.registrationTimeoutMs = j;
    }

    public void init(String str) {
        this.appId = str;
        TransportContext transportContext = new TransportContext(this.transportConf, new NoOpRpcHandler(), true, true);
        ArrayList newArrayList = Lists.newArrayList();
        if (this.authEnabled) {
            newArrayList.add(new AuthClientBootstrap(this.transportConf, str, this.secretKeyHolder));
        }
        this.clientFactory = transportContext.createClientFactory(newArrayList);
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void setAppAttemptId(String str) {
        super.setAppAttemptId(str);
        setComparableAppAttemptId(str);
    }

    private void setComparableAppAttemptId(String str) {
        try {
            this.comparableAppAttemptId = Integer.parseInt(str);
        } catch (NumberFormatException e) {
            this.logger.warn("Push based shuffle requires comparable application attemptId, but the appAttemptId {} cannot be parsed to Integer", e, new MDC[]{MDC.of(LogKeys$APP_ATTEMPT_ID$.MODULE$, str)});
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void fetchBlocks(String str, int i, String str2, String[] strArr, BlockFetchingListener blockFetchingListener, DownloadFileManager downloadFileManager) {
        checkInit();
        this.logger.debug("External shuffle fetch from {}:{} (executor id {})", new Object[]{str, Integer.valueOf(i), str2});
        try {
            int maxIORetries = this.transportConf.maxIORetries();
            RetryingBlockTransferor.BlockTransferStarter blockTransferStarter = (strArr2, blockTransferListener) -> {
                if (this.clientFactory == null) {
                    this.logger.info("This clientFactory was closed. Skipping further block fetch retries.");
                } else {
                    if (!$assertionsDisabled && !(blockTransferListener instanceof BlockFetchingListener)) {
                        throw new AssertionError("Expecting a BlockFetchingListener, but got " + blockTransferListener.getClass());
                    }
                    new OneForOneBlockFetcher(this.clientFactory.createClient(str, i, maxIORetries > 0), this.appId, str2, strArr2, (BlockFetchingListener) blockTransferListener, this.transportConf, downloadFileManager).start();
                }
            };
            if (maxIORetries > 0) {
                new RetryingBlockTransferor(this.transportConf, blockTransferStarter, strArr, blockFetchingListener).start();
            } else {
                blockTransferStarter.createAndStart(strArr, blockFetchingListener);
            }
        } catch (Exception e) {
            this.logger.error("Exception while beginning fetchBlocks", e);
            for (String str3 : strArr) {
                blockFetchingListener.onBlockFetchFailure(str3, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void pushBlocks(String str, int i, String[] strArr, ManagedBuffer[] managedBufferArr, BlockPushingListener blockPushingListener) {
        checkInit();
        if (!$assertionsDisabled && strArr.length != managedBufferArr.length) {
            throw new AssertionError("Number of block ids and buffers do not match.");
        }
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            hashMap.put(strArr[i2], managedBufferArr[i2]);
        }
        this.logger.debug("Push {} shuffle blocks to {}:{}", new Object[]{Integer.valueOf(strArr.length), str, Integer.valueOf(i)});
        try {
            RetryingBlockTransferor.BlockTransferStarter blockTransferStarter = (strArr2, blockTransferListener) -> {
                if (this.clientFactory == null) {
                    this.logger.info("This clientFactory was closed. Skipping further block push retries.");
                } else {
                    if (!$assertionsDisabled && !(blockTransferListener instanceof BlockPushingListener)) {
                        throw new AssertionError("Expecting a BlockPushingListener, but got " + blockTransferListener.getClass());
                    }
                    new OneForOneBlockPusher(this.clientFactory.createClient(str, i), this.appId, this.comparableAppAttemptId, strArr2, (BlockPushingListener) blockTransferListener, hashMap).start();
                }
            };
            if (this.transportConf.maxIORetries() > 0) {
                new RetryingBlockTransferor(this.transportConf, blockTransferStarter, strArr, blockPushingListener, PUSH_ERROR_HANDLER).start();
            } else {
                blockTransferStarter.createAndStart(strArr, blockPushingListener);
            }
        } catch (Exception e) {
            this.logger.error("Exception while beginning pushBlocks", e);
            for (String str2 : strArr) {
                blockPushingListener.onBlockPushFailure(str2, e);
            }
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void finalizeShuffleMerge(String str, int i, int i2, int i3, final MergeFinalizerListener mergeFinalizerListener) {
        checkInit();
        try {
            this.clientFactory.createClient(str, i).sendRpc(new FinalizeShuffleMerge(this.appId, this.comparableAppAttemptId, i2, i3).toByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.shuffle.ExternalBlockStoreClient.1
                public void onSuccess(ByteBuffer byteBuffer) {
                    mergeFinalizerListener.onShuffleMergeSuccess((MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer));
                }

                public void onFailure(Throwable th) {
                    mergeFinalizerListener.onShuffleMergeFailure(th);
                }
            });
        } catch (Exception e) {
            this.logger.error("Exception while sending finalizeShuffleMerge request to {}:{}", e, new MDC[]{MDC.of(LogKeys$HOST$.MODULE$, str), MDC.of(LogKeys$PORT$.MODULE$, Integer.valueOf(i))});
            mergeFinalizerListener.onShuffleMergeFailure(e);
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public void getMergedBlockMeta(String str, int i, final int i2, final int i3, final int i4, final MergedBlocksMetaListener mergedBlocksMetaListener) {
        checkInit();
        this.logger.debug("Get merged blocks meta from {}:{} for shuffleId {} shuffleMergeId {} reduceId {}", new Object[]{str, Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4)});
        try {
            this.clientFactory.createClient(str, i).sendMergedBlockMetaReq(this.appId, i2, i3, i4, new MergedBlockMetaResponseCallback() { // from class: org.apache.spark.network.shuffle.ExternalBlockStoreClient.2
                public void onSuccess(int i5, ManagedBuffer managedBuffer) {
                    ExternalBlockStoreClient.this.logger.trace("Successfully got merged block meta for shuffleId {} shuffleMergeId {} reduceId {}", new Object[]{Integer.valueOf(i2), Integer.valueOf(i3), Integer.valueOf(i4)});
                    mergedBlocksMetaListener.onSuccess(i2, i3, i4, new MergedBlockMeta(i5, managedBuffer));
                }

                public void onFailure(Throwable th) {
                    mergedBlocksMetaListener.onFailure(i2, i3, i4, th);
                }
            });
        } catch (Exception e) {
            mergedBlocksMetaListener.onFailure(i2, i3, i4, e);
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public boolean removeShuffleMerge(String str, int i, int i2, int i3) {
        checkInit();
        try {
            this.clientFactory.createClient(str, i).send(new RemoveShuffleMerge(this.appId, this.comparableAppAttemptId, i2, i3).toByteBuffer());
            return true;
        } catch (Exception e) {
            this.logger.debug("Exception while sending RemoveShuffleMerge request to {}:{}", new Object[]{str, Integer.valueOf(i), e});
            return false;
        }
    }

    @Override // org.apache.spark.network.shuffle.BlockStoreClient
    public MetricSet shuffleMetrics() {
        checkInit();
        return this.clientFactory.getAllMetrics();
    }

    public void registerWithShuffleServer(String str, int i, String str2, ExecutorShuffleInfo executorShuffleInfo) throws IOException, InterruptedException {
        checkInit();
        TransportClient createClient = this.clientFactory.createClient(str, i);
        try {
            createClient.sendRpcSync(new RegisterExecutor(this.appId, str2, executorShuffleInfo).toByteBuffer(), this.registrationTimeoutMs);
            if (createClient != null) {
                createClient.close();
            }
        } catch (Throwable th) {
            if (createClient != null) {
                try {
                    createClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public Future<Integer> removeBlocks(String str, int i, final String str2, final String[] strArr) throws IOException, InterruptedException {
        checkInit();
        final CompletableFuture completableFuture = new CompletableFuture();
        this.clientFactory.createClient(str, i).sendRpc(new RemoveBlocks(this.appId, str2, strArr).toByteBuffer(), new RpcResponseCallback() { // from class: org.apache.spark.network.shuffle.ExternalBlockStoreClient.3
            public void onSuccess(ByteBuffer byteBuffer) {
                try {
                    completableFuture.complete(Integer.valueOf(((BlocksRemoved) BlockTransferMessage.Decoder.fromByteBuffer(byteBuffer)).numRemovedBlocks));
                } catch (Throwable th) {
                    ExternalBlockStoreClient.this.logger.warn("Error trying to remove blocks {} via external shuffle service from executor: {}", th, new MDC[]{MDC.of(LogKeys$BLOCK_IDS$.MODULE$, Arrays.toString(strArr)), MDC.of(LogKeys$EXECUTOR_ID$.MODULE$, str2)});
                    completableFuture.complete(0);
                }
            }

            public void onFailure(Throwable th) {
                ExternalBlockStoreClient.this.logger.warn("Error trying to remove blocks {} via external shuffle service from executor: {}", th, new MDC[]{MDC.of(LogKeys$BLOCK_IDS$.MODULE$, Arrays.toString(strArr)), MDC.of(LogKeys$EXECUTOR_ID$.MODULE$, str2)});
                completableFuture.complete(0);
            }
        });
        return completableFuture;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        checkInit();
        if (this.clientFactory != null) {
            this.clientFactory.close();
            this.clientFactory = null;
        }
    }

    static {
        $assertionsDisabled = !ExternalBlockStoreClient.class.desiredAssertionStatus();
        PUSH_ERROR_HANDLER = new ErrorHandler.BlockPushErrorHandler();
    }
}
