package org.apache.hadoop.mapred;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.URL;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.SecretKey;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.ByteBuf;
import org.apache.hadoop.hbase.shaded.io.netty.buffer.Unpooled;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFuture;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelFutureListener;
import org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.TooLongFrameException;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.FullHttpRequest;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpMethod;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpRequest;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpResponse;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpUtil;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.HttpVersion;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.LastHttpContent;
import org.apache.hadoop.hbase.shaded.io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.hadoop.hbase.shaded.io.netty.handler.ssl.SslHandler;
import org.apache.hadoop.hbase.shaded.io.netty.util.CharsetUtil;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.Future;
import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GenericFutureListener;
import org.apache.hadoop.hbase.shaded.org.eclipse.jetty.http.HttpHeader;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapred.ShuffleHandler;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.yarn.webapp.MimeType;

/* loaded from: input_file:org/apache/hadoop/mapred/ShuffleChannelHandler.class */
public class ShuffleChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private final ShuffleChannelHandlerContext handlerCtx;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleChannelHandler$MapOutputInfo.class */
    public static class MapOutputInfo {
        final Path mapOutputFileName;
        final IndexRecord indexRecord;

        MapOutputInfo(Path path, IndexRecord indexRecord) {
            this.mapOutputFileName = path;
            this.indexRecord = indexRecord;
        }
    }

    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleChannelHandler$ReduceContext.class */
    public static class ReduceContext {
        private final List<String> mapIds;
        private final AtomicInteger mapsToWait;
        private final AtomicInteger mapsToSend = new AtomicInteger(0);
        private final int reduceId;
        private final ChannelHandlerContext ctx;
        private final String user;
        private final Map<String, MapOutputInfo> infoMap;
        private final String jobId;
        private final boolean keepAlive;

        ReduceContext(List<String> list, int i, ChannelHandlerContext channelHandlerContext, String str, Map<String, MapOutputInfo> map, String str2, boolean z) {
            this.mapIds = list;
            this.reduceId = i;
            this.mapsToWait = new AtomicInteger(list.size());
            this.ctx = channelHandlerContext;
            this.user = str;
            this.infoMap = map;
            this.jobId = str2;
            this.keepAlive = z;
        }

        public int getReduceId() {
            return this.reduceId;
        }

        public ChannelHandlerContext getCtx() {
            return this.ctx;
        }

        public String getUser() {
            return this.user;
        }

        public Map<String, MapOutputInfo> getInfoMap() {
            return this.infoMap;
        }

        public String getJobId() {
            return this.jobId;
        }

        public List<String> getMapIds() {
            return this.mapIds;
        }

        public AtomicInteger getMapsToSend() {
            return this.mapsToSend;
        }

        public AtomicInteger getMapsToWait() {
            return this.mapsToWait;
        }

        public boolean getKeepAlive() {
            return this.keepAlive;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/mapred/ShuffleChannelHandler$ReduceMapFileCount.class */
    public static class ReduceMapFileCount implements ChannelFutureListener {
        private final ShuffleChannelHandler handler;
        private final ReduceContext reduceContext;

        ReduceMapFileCount(ShuffleChannelHandler shuffleChannelHandler, ReduceContext reduceContext) {
            this.handler = shuffleChannelHandler;
            this.reduceContext = reduceContext;
        }

        @Override // org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.GenericFutureListener
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            ShuffleHandler.LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'", Integer.valueOf(this.reduceContext.getMapsToWait().get()), channelFuture.channel().id());
            if (!channelFuture.isSuccess()) {
                ShuffleHandler.LOG.error("Future is unsuccessful. channel='{}' Cause: ", channelFuture.channel().id(), channelFuture.cause());
                channelFuture.channel().close();
                return;
            }
            if (this.reduceContext.getMapsToWait().decrementAndGet() != 0) {
                ShuffleHandler.LOG.trace("SendMap operation complete, waitCount > 0, invoking sendMap with reduceContext; channel='{}'", channelFuture.channel().id());
                this.handler.sendMap(this.reduceContext);
                return;
            }
            ChannelFuture writeAndFlush = channelFuture.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
            this.handler.handlerCtx.metrics.operationComplete(channelFuture);
            if (this.reduceContext.getKeepAlive()) {
                ShuffleHandler.LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'", channelFuture.channel().id());
                ((ShuffleHandler.TimeoutHandler) channelFuture.channel().pipeline().get("timeout")).setEnabledTimeout(true);
            } else {
                ShuffleHandler.LOG.trace("SendMap operation complete, closing connection; channel='{}'", channelFuture.channel().id());
                writeAndFlush.addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShuffleChannelHandler(ShuffleChannelHandlerContext shuffleChannelHandlerContext) {
        this.handlerCtx = shuffleChannelHandlerContext;
    }

    private List<String> splitMaps(List<String> list) {
        if (null == list) {
            return null;
        }
        ArrayList arrayList = new ArrayList();
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Collections.addAll(arrayList, it.next().split(","));
        }
        return arrayList;
    }

    @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        ShuffleHandler.LOG.debug("Executing channelActive; channel='{}'", channelHandlerContext.channel().id());
        int incrementAndGet = this.handlerCtx.activeConnections.incrementAndGet();
        if (this.handlerCtx.maxShuffleConnections <= 0 || incrementAndGet <= this.handlerCtx.maxShuffleConnections) {
            super.channelActive(channelHandlerContext);
            this.handlerCtx.allChannels.add(channelHandlerContext.channel());
            ShuffleHandler.LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}", new Object[]{channelHandlerContext.channel(), channelHandlerContext.channel().id(), Integer.valueOf(this.handlerCtx.activeConnections.get())});
        } else {
            ShuffleHandler.LOG.info(String.format("Current number of shuffle connections (%d) is greater than the max allowed shuffle connections (%d)", Integer.valueOf(this.handlerCtx.allChannels.size()), Integer.valueOf(this.handlerCtx.maxShuffleConnections)));
            HashMap hashMap = new HashMap(1);
            hashMap.put("Retry-After", String.valueOf(1000L));
            sendError(channelHandlerContext, "", ShuffleHandler.TOO_MANY_REQ_STATUS, hashMap);
        }
    }

    @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        ShuffleHandler.LOG.debug("Executing channelInactive; channel='{}'", channelHandlerContext.channel().id());
        super.channelInactive(channelHandlerContext);
        ShuffleHandler.LOG.debug("New value of Accepted number of connections={}", Integer.valueOf(this.handlerCtx.activeConnections.decrementAndGet()));
    }

    @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.SimpleChannelInboundHandler
    public void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
        Channel channel = channelHandlerContext.channel();
        ShuffleHandler.LOG.debug("Received HTTP request: {}, channel='{}'", fullHttpRequest, channel.id());
        if (fullHttpRequest.method() != HttpMethod.GET) {
            sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
            return;
        }
        String str = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
        String str2 = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
        if (fullHttpRequest.headers() != null) {
            str = fullHttpRequest.headers().get("version");
            str2 = fullHttpRequest.headers().get("name");
            ShuffleHandler.LOG.debug("Received from request header: ShuffleVersion={} header name={}, channel id: {}", new Object[]{str, str2, channel.id()});
        }
        if (fullHttpRequest.headers() == null || !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(str2) || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(str)) {
            sendError(channelHandlerContext, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        Map<String, List<String>> parameters = new QueryStringDecoder(fullHttpRequest.uri()).parameters();
        List<String> list = parameters.get("keepAlive");
        boolean z = false;
        if (list != null && list.size() == 1) {
            z = Boolean.parseBoolean(list.get(0));
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("KeepAliveParam: {} : {}, channel id: {}", new Object[]{list, Boolean.valueOf(z), channel.id()});
            }
        }
        List<String> splitMaps = splitMaps(parameters.get("map"));
        List<String> list2 = parameters.get("reduce");
        List<String> list3 = parameters.get(org.apache.hadoop.mapreduce.JobID.JOB);
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            ShuffleHandler.LOG.debug("RECV: " + fullHttpRequest.uri() + "\n  mapId: " + splitMaps + "\n  reduceId: " + list2 + "\n  jobId: " + list3 + "\n  keepAlive: " + z + "\n  channel id: " + channel.id());
        }
        if (splitMaps == null || list2 == null || list3 == null) {
            sendError(channelHandlerContext, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if (list2.size() != 1 || list3.size() != 1) {
            sendError(channelHandlerContext, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
            return;
        }
        try {
            int parseInt = Integer.parseInt(list2.get(0));
            String str3 = list3.get(0);
            String uri = fullHttpRequest.uri();
            if (null == uri) {
                sendError(channelHandlerContext, HttpResponseStatus.FORBIDDEN);
                return;
            }
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            try {
                verifyRequest(str3, channelHandlerContext, fullHttpRequest, defaultHttpResponse, new URL("http", "", this.handlerCtx.port, uri));
                HashMap hashMap = new HashMap();
                ((ShuffleHandler.TimeoutHandler) channel.pipeline().get("timeout")).setEnabledTimeout(false);
                String str4 = this.handlerCtx.userRsrc.get(str3);
                try {
                    populateHeaders(splitMaps, str3, str4, parseInt, defaultHttpResponse, z, hashMap);
                    channel.write(defaultHttpResponse);
                    sendMap(new ReduceContext(splitMaps, parseInt, channelHandlerContext, str4, hashMap, str3, z || this.handlerCtx.connectionKeepAliveEnabled));
                } catch (IOException e) {
                    ShuffleHandler.LOG.error("Shuffle error while populating headers. Channel id: " + channel.id(), e);
                    sendError(channelHandlerContext, getErrorMessage(e), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                }
            } catch (IOException e2) {
                ShuffleHandler.LOG.warn("Shuffle failure ", e2);
                sendError(channelHandlerContext, e2.getMessage(), HttpResponseStatus.UNAUTHORIZED);
            }
        } catch (NumberFormatException e3) {
            sendError(channelHandlerContext, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
        } catch (IllegalArgumentException e4) {
            sendError(channelHandlerContext, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
        }
    }

    public void sendMap(ReduceContext reduceContext) {
        ShuffleHandler.LOG.trace("Executing sendMap; channel='{}'", reduceContext.ctx.channel().id());
        if (reduceContext.getMapsToSend().get() < reduceContext.getMapIds().size()) {
            String str = reduceContext.getMapIds().get(reduceContext.getMapsToSend().getAndIncrement());
            try {
                MapOutputInfo mapOutputInfo = reduceContext.getInfoMap().get(str);
                if (mapOutputInfo == null) {
                    mapOutputInfo = getMapOutputInfo(str, reduceContext.getReduceId(), reduceContext.getJobId(), reduceContext.getUser());
                }
                ShuffleHandler.LOG.trace("Calling sendMapOutput; channel='{}'", reduceContext.ctx.channel().id());
                sendMapOutput(reduceContext.getCtx().channel(), reduceContext.getUser(), str, reduceContext.getReduceId(), mapOutputInfo).addListener2((GenericFutureListener<? extends Future<? super Void>>) new ReduceMapFileCount(this, reduceContext));
            } catch (IOException e) {
                ShuffleHandler.LOG.error("Shuffle error: {}; channel={}", e, reduceContext.ctx.channel().id());
                reduceContext.ctx.channel().close();
            }
        }
    }

    private String getErrorMessage(Throwable th) {
        StringBuilder sb = new StringBuilder(th.getMessage());
        while (th.getCause() != null) {
            sb.append(th.getCause().getMessage());
            th = th.getCause();
        }
        return sb.toString();
    }

    protected MapOutputInfo getMapOutputInfo(String str, int i, String str2, String str3) throws IOException {
        try {
            ShuffleHandler.AttemptPathIdentifier attemptPathIdentifier = new ShuffleHandler.AttemptPathIdentifier(str2, str3, str);
            ShuffleHandler.AttemptPathInfo attemptPathInfo = this.handlerCtx.pathCache.get(attemptPathIdentifier);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("Retrieved pathInfo for " + attemptPathIdentifier + " check for corresponding loaded messages to determine whether it was loaded or cached");
            }
            IndexRecord indexInformation = this.handlerCtx.indexCache.getIndexInformation(str, i, attemptPathInfo.indexPath, str3);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("getMapOutputInfo: jobId=" + str2 + ", mapId=" + str + ",dataFile=" + attemptPathInfo.dataPath + ", indexFile=" + attemptPathInfo.indexPath);
                ShuffleHandler.LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}", new Object[]{Long.valueOf(indexInformation.startOffset), Long.valueOf(indexInformation.partLength), Long.valueOf(indexInformation.rawLength)});
            }
            return new MapOutputInfo(attemptPathInfo.dataPath, indexInformation);
        } catch (ExecutionException e) {
            if (e.getCause() instanceof IOException) {
                throw ((IOException) e.getCause());
            }
            throw new RuntimeException(e.getCause());
        }
    }

    protected void populateHeaders(List<String> list, String str, String str2, int i, HttpResponse httpResponse, boolean z, Map<String, MapOutputInfo> map) throws IOException {
        long j = 0;
        for (String str3 : list) {
            MapOutputInfo mapOutputInfo = getMapOutputInfo(str3, i, str, str2);
            if (map.size() < this.handlerCtx.mapOutputMetaInfoCacheSize) {
                map.put(str3, mapOutputInfo);
            }
            new ShuffleHeader(str3, mapOutputInfo.indexRecord.partLength, mapOutputInfo.indexRecord.rawLength, i).write(new DataOutputBuffer());
            j = j + mapOutputInfo.indexRecord.partLength + r0.getLength();
            SecureIOUtils.openForRandomRead(new File(mapOutputInfo.mapOutputFileName.toString()), "r", str2, null).close();
        }
        setResponseHeaders(httpResponse, z, j);
        if (ShuffleHandler.AUDITLOG.isDebugEnabled()) {
            StringBuilder sb = new StringBuilder("shuffle for ");
            sb.append(str).append(" reducer ").append(i);
            sb.append(" length ").append(j);
            if (!ShuffleHandler.AUDITLOG.isTraceEnabled()) {
                ShuffleHandler.AUDITLOG.debug(sb.toString());
            } else {
                sb.append(" mappers: ").append(list);
                ShuffleHandler.AUDITLOG.trace(sb.toString());
            }
        }
    }

    protected void setResponseHeaders(HttpResponse httpResponse, boolean z, long j) {
        if (this.handlerCtx.connectionKeepAliveEnabled || z) {
            httpResponse.headers().set(HttpHeader.CONNECTION.asString(), (Object) HttpHeader.KEEP_ALIVE.asString());
            httpResponse.headers().set(HttpHeader.KEEP_ALIVE.asString(), (Object) ("timeout=" + this.handlerCtx.connectionKeepAliveTimeOut));
        } else {
            httpResponse.headers().set(HttpHeader.CONNECTION.asString(), "close");
        }
        HttpUtil.setContentLength(httpResponse, j);
    }

    protected void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
        SecretKey retrieveTokenSecret = this.handlerCtx.secretManager.retrieveTokenSecret(str);
        if (null == retrieveTokenSecret) {
            ShuffleHandler.LOG.info("Request for unknown token {}, channel id: {}", str, channelHandlerContext.channel().id());
            throw new IOException("Could not find jobid");
        }
        String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(url);
        String str2 = httpRequest.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
        if (str2 == null) {
            ShuffleHandler.LOG.info("Missing header hash for {}, channel id: {}", str, channelHandlerContext.channel().id());
            throw new IOException("fetcher cannot be authenticated");
        }
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            int length = str2.length();
            ShuffleHandler.LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: {}", new Object[]{buildMsgFrom, str2.substring(length - (length / 2), length - 1), channelHandlerContext.channel().id()});
        }
        SecureShuffleUtils.verifyReply(str2, buildMsgFrom, retrieveTokenSecret);
        String generateHash = SecureShuffleUtils.generateHash(str2.getBytes(StandardCharsets.UTF_8), retrieveTokenSecret);
        httpResponse.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, (Object) generateHash);
        httpResponse.headers().set("name", ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
        httpResponse.headers().set("version", ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
        if (ShuffleHandler.LOG.isDebugEnabled()) {
            int length2 = generateHash.length();
            ShuffleHandler.LOG.debug("Fetcher request verified. encryptedURL: {}, reply: {}, channel id: {}", new Object[]{buildMsgFrom, generateHash.substring(length2 - (length2 / 2), length2 - 1), channelHandlerContext.channel().id()});
        }
    }

    public static ByteBuf shuffleHeaderToBytes(ShuffleHeader shuffleHeader) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        shuffleHeader.write(dataOutputBuffer);
        return Unpooled.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    protected ChannelFuture sendMapOutput(Channel channel, String str, String str2, int i, MapOutputInfo mapOutputInfo) throws IOException {
        ChannelFuture writeAndFlush;
        IndexRecord indexRecord = mapOutputInfo.indexRecord;
        channel.write(shuffleHeaderToBytes(new ShuffleHeader(str2, indexRecord.partLength, indexRecord.rawLength, i)));
        File file = new File(mapOutputInfo.mapOutputFileName.toString());
        RandomAccessFile openForRandomRead = SecureIOUtils.openForRandomRead(file, "r", str, null);
        if (channel.pipeline().get(SslHandler.class) == null) {
            FadvisedFileRegion fadvisedFileRegion = new FadvisedFileRegion(openForRandomRead, indexRecord.startOffset, indexRecord.partLength, this.handlerCtx.manageOsCache, this.handlerCtx.readaheadLength, this.handlerCtx.readaheadPool, file.getAbsolutePath(), this.handlerCtx.shuffleBufferSize, this.handlerCtx.shuffleTransferToAllowed);
            writeAndFlush = channel.writeAndFlush(fadvisedFileRegion);
            writeAndFlush.addListener2(channelFuture -> {
                if (channelFuture.isSuccess()) {
                    fadvisedFileRegion.transferSuccessful();
                }
                fadvisedFileRegion.deallocate();
            });
        } else {
            writeAndFlush = channel.writeAndFlush(new FadvisedChunkedFile(openForRandomRead, indexRecord.startOffset, indexRecord.partLength, this.handlerCtx.sslFileBufferSize, this.handlerCtx.manageOsCache, this.handlerCtx.readaheadLength, this.handlerCtx.readaheadPool, file.getAbsolutePath()));
        }
        this.handlerCtx.metrics.shuffleConnections.incr();
        this.handlerCtx.metrics.shuffleOutputBytes.incr(indexRecord.partLength);
        return writeAndFlush;
    }

    protected void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
        sendError(channelHandlerContext, "", httpResponseStatus);
    }

    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
        sendError(channelHandlerContext, str, httpResponseStatus, Collections.emptyMap());
    }

    protected void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus, Map<String, String> map) {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
        defaultFullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, MimeType.TEXT);
        defaultFullHttpResponse.headers().set("name", (Object) ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
        defaultFullHttpResponse.headers().set("version", (Object) ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
        for (Map.Entry<String, String> entry : map.entrySet()) {
            defaultFullHttpResponse.headers().set(entry.getKey(), (Object) entry.getValue());
        }
        HttpUtil.setContentLength(defaultFullHttpResponse, defaultFullHttpResponse.content().readableBytes());
        channelHandlerContext.channel().writeAndFlush(defaultFullHttpResponse).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE);
    }

    @Override // org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandlerAdapter, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelHandler, org.apache.hadoop.hbase.shaded.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        Channel channel = channelHandlerContext.channel();
        if (th instanceof TooLongFrameException) {
            ShuffleHandler.LOG.trace("TooLongFrameException, channel id: {}", channel.id());
            sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
            return;
        }
        if (th instanceof IOException) {
            if (th instanceof ClosedChannelException) {
                ShuffleHandler.LOG.debug("Ignoring closed channel error, channel id: " + channel.id(), th);
                return;
            }
            if (ShuffleHandler.IGNORABLE_ERROR_MESSAGE.matcher(String.valueOf(th.getMessage())).matches()) {
                ShuffleHandler.LOG.debug("Ignoring client socket close, channel id: " + channel.id(), th);
                return;
            }
        }
        ShuffleHandler.LOG.error("Shuffle error. Channel id: " + channel.id(), th);
        if (channel.isActive()) {
            sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        }
    }
}
