/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.auxservices;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.proto.SecurityProtos;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.protobuf.ByteString;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.tez.auxservices.FadvisedChunkedFile;
import org.apache.tez.auxservices.FadvisedFileRegion;
import org.apache.tez.auxservices.IndexCache;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.api.ShuffleHandlerError;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.tez.shaded.com.google.common.base.Charsets;
import org.apache.tez.shaded.com.google.common.cache.CacheBuilder;
import org.apache.tez.shaded.com.google.common.cache.CacheLoader;
import org.apache.tez.shaded.com.google.common.cache.LoadingCache;
import org.apache.tez.shaded.com.google.common.cache.RemovalListener;
import org.apache.tez.shaded.com.google.common.cache.RemovalNotification;
import org.apache.tez.shaded.com.google.common.cache.Weigher;
import org.apache.tez.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.tez.shaded.io.netty.buffer.ByteBuf;
import org.apache.tez.shaded.io.netty.buffer.Unpooled;
import org.apache.tez.shaded.io.netty.channel.Channel;
import org.apache.tez.shaded.io.netty.channel.ChannelDuplexHandler;
import org.apache.tez.shaded.io.netty.channel.ChannelFuture;
import org.apache.tez.shaded.io.netty.channel.ChannelFutureListener;
import org.apache.tez.shaded.io.netty.channel.ChannelHandler;
import org.apache.tez.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.tez.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.tez.shaded.io.netty.channel.ChannelInitializer;
import org.apache.tez.shaded.io.netty.channel.ChannelOption;
import org.apache.tez.shaded.io.netty.channel.ChannelPipeline;
import org.apache.tez.shaded.io.netty.channel.group.ChannelGroup;
import org.apache.tez.shaded.io.netty.channel.group.DefaultChannelGroup;
import org.apache.tez.shaded.io.netty.channel.nio.NioEventLoopGroup;
import org.apache.tez.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import org.apache.tez.shaded.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.tez.shaded.io.netty.handler.codec.TooLongFrameException;
import org.apache.tez.shaded.io.netty.handler.codec.http.DefaultFullHttpResponse;
import org.apache.tez.shaded.io.netty.handler.codec.http.DefaultHttpResponse;
import org.apache.tez.shaded.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpMethod;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpObjectAggregator;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpRequest;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpRequestDecoder;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpResponse;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpResponseEncoder;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.tez.shaded.io.netty.handler.codec.http.HttpVersion;
import org.apache.tez.shaded.io.netty.handler.codec.http.LastHttpContent;
import org.apache.tez.shaded.io.netty.handler.codec.http.QueryStringDecoder;
import org.apache.tez.shaded.io.netty.handler.logging.LogLevel;
import org.apache.tez.shaded.io.netty.handler.logging.LoggingHandler;
import org.apache.tez.shaded.io.netty.handler.ssl.SslHandler;
import org.apache.tez.shaded.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.tez.shaded.io.netty.handler.timeout.IdleState;
import org.apache.tez.shaded.io.netty.handler.timeout.IdleStateEvent;
import org.apache.tez.shaded.io.netty.handler.timeout.IdleStateHandler;
import org.apache.tez.shaded.io.netty.util.CharsetUtil;
import org.apache.tez.shaded.io.netty.util.concurrent.GlobalEventExecutor;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
import org.slf4j.LoggerFactory;

public class ShuffleHandler
extends AuxiliaryService {
    private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(ShuffleHandler.class);
    private static final org.slf4j.Logger AUDITLOG = LoggerFactory.getLogger((String)(ShuffleHandler.class.getName() + ".audit"));
    public static final String SHUFFLE_MANAGE_OS_CACHE = "tez.shuffle.manage.os.cache";
    public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
    public static final String SHUFFLE_READAHEAD_BYTES = "tez.shuffle.readahead.bytes";
    public static final int DEFAULT_SHUFFLE_READAHEAD_BYTES = 0x400000;
    public static final String USERCACHE = "usercache";
    public static final String APPCACHE = "appcache";
    private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile("^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$", 2);
    private static final String STATE_DB_NAME = "tez_shuffle_state";
    private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
    protected static final Version CURRENT_VERSION_INFO = Version.newInstance((int)1, (int)0);
    private static final String DATA_FILE_NAME = "file.out";
    private static final String INDEX_FILE_NAME = "file.out.index";
    private int port;
    private NioEventLoopGroup bossGroup;
    private NioEventLoopGroup workerGroup;
    private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private int sslFileBufferSize;
    private Shuffle SHUFFLE;
    private SSLFactory sslFactory;
    private boolean manageOsCache;
    private int readaheadLength;
    private int maxShuffleConnections;
    private int shuffleBufferSize;
    private boolean shuffleTransferToAllowed;
    private int maxSessionOpenFiles;
    private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
    private Map<String, String> userRsrc;
    private JobTokenSecretManager secretManager;
    private DB stateDb = null;
    public static final String TEZ_SHUFFLE_SERVICEID = "tez_shuffle";
    public static final String SHUFFLE_PORT_CONFIG_KEY = "tez.shuffle.port";
    public static final int DEFAULT_SHUFFLE_PORT = 13563;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = "tez.shuffle.connection-keep-alive.enable";
    public static final boolean DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED = false;
    public static final String SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = "tez.shuffle.connection-keep-alive.timeout";
    public static final int DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT = 5;
    public static final String SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = "tez.shuffle.mapoutput-info.meta.cache.size";
    public static final int DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE = 1000;
    public static final String CONNECTION_CLOSE = "close";
    public static final String SHUFFLE_SSL_ENABLED_KEY = "tez.shuffle.ssl.enabled";
    public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false;
    public static final String SUFFLE_SSL_FILE_BUFFER_SIZE_KEY = "tez.shuffle.ssl.file.buffer.size";
    public static final int DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE = 61440;
    public static final String MAX_SHUFFLE_CONNECTIONS = "tez.shuffle.max.connections";
    public static final int DEFAULT_MAX_SHUFFLE_CONNECTIONS = 0;
    public static final String MAX_SHUFFLE_THREADS = "tez.shuffle.max.threads";
    public static final int DEFAULT_MAX_SHUFFLE_THREADS = 0;
    public static final String SHUFFLE_BUFFER_SIZE = "tez.shuffle.transfer.buffer.size";
    public static final int DEFAULT_SHUFFLE_BUFFER_SIZE = 131072;
    public static final String SHUFFLE_TRANSFERTO_ALLOWED = "tez.shuffle.transferTo.allowed";
    public static final boolean DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = true;
    public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = false;
    private static final String TIMEOUT_HANDLER = "timeout";
    public static final String SHUFFLE_MAX_SESSION_OPEN_FILES = "tez.shuffle.max.session-open-files";
    public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
    public static final String SHUFFLE_LISTEN_QUEUE_SIZE = "tez.shuffle.listen.queue.size";
    public static final int DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE = 128;
    boolean connectionKeepAliveEnabled = false;
    private int connectionKeepAliveTimeOut;
    private int mapOutputMetaInfoCacheSize;
    final ShuffleMetrics metrics;

    ShuffleHandler(MetricsSystem ms) {
        super(TEZ_SHUFFLE_SERVICEID);
        this.metrics = (ShuffleMetrics)ms.register((Object)new ShuffleMetrics());
    }

    public ShuffleHandler() {
        this(DefaultMetricsSystem.instance());
    }

    public static ByteBuffer serializeMetaData(int port) throws IOException {
        DataOutputBuffer portDob = new DataOutputBuffer();
        portDob.writeInt(port);
        ByteBuffer buf = ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
        portDob.close();
        return buf;
    }

    public static int deserializeMetaData(ByteBuffer meta) throws IOException {
        DataInputByteBuffer in = new DataInputByteBuffer();
        in.reset(new ByteBuffer[]{meta});
        int port = in.readInt();
        in.close();
        return port;
    }

    public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken) throws IOException {
        DataOutputBuffer jobToken_dob = new DataOutputBuffer();
        jobToken.write((DataOutput)jobToken_dob);
        return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
    }

    static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
        DataInputByteBuffer in = new DataInputByteBuffer();
        in.reset(new ByteBuffer[]{secret});
        Token jt = new Token();
        jt.readFields((DataInput)in);
        return jt;
    }

    public int getPort() {
        return this.port;
    }

    public void initializeApplication(ApplicationInitializationContext context) {
        String user = context.getUser();
        ApplicationId appId = context.getApplicationId();
        ByteBuffer secret = context.getApplicationDataForService();
        try {
            Token<JobTokenIdentifier> jt = ShuffleHandler.deserializeServiceData(secret);
            JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
            this.recordJobShuffleInfo(jobId, user, jt);
        }
        catch (IOException e) {
            LOG.error("Error during initApp", (Throwable)e);
        }
    }

    public void stopApplication(ApplicationTerminationContext context) {
        ApplicationId appId = context.getApplicationId();
        JobID jobId = new JobID(Long.toString(appId.getClusterTimestamp()), appId.getId());
        try {
            this.removeJobShuffleInfo(jobId);
        }
        catch (IOException e) {
            LOG.error("Error during stopApp", (Throwable)e);
        }
    }

    protected void serviceInit(Configuration conf) throws Exception {
        this.manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE, true);
        this.readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES, 0x400000);
        this.maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, 0);
        int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS, 0);
        if (maxShuffleThreads == 0) {
            maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
        }
        this.shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, 131072);
        this.shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED, !Shell.WINDOWS);
        this.maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES, 3);
        String BOSS_THREAD_NAME_PREFIX = "Tez Shuffle Handler Boss #";
        final AtomicInteger bossThreadCounter = new AtomicInteger(0);
        this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Tez Shuffle Handler Boss #" + bossThreadCounter.incrementAndGet());
            }
        });
        String WORKER_THREAD_NAME_PREFIX = "Tez Shuffle Handler Worker #";
        final AtomicInteger workerThreadCounter = new AtomicInteger(0);
        this.workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "Tez Shuffle Handler Worker #" + workerThreadCounter.incrementAndGet());
            }
        });
        this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, 13563);
        super.serviceInit((Configuration)new YarnConfiguration(conf));
    }

    protected void serviceStart() throws Exception {
        Configuration conf = this.getConfig();
        this.userRsrc = new ConcurrentHashMap<String, String>();
        this.secretManager = new JobTokenSecretManager();
        this.recoverState(conf);
        ServerBootstrap bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().channel(NioServerSocketChannel.class)).group(this.bossGroup, this.workerGroup).localAddress(this.port)).option(ChannelOption.SO_BACKLOG, conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, 128))).childOption(ChannelOption.SO_KEEPALIVE, true);
        this.initPipeline(bootstrap, conf);
        Channel ch = bootstrap.bind().sync().channel();
        this.accepted.add(ch);
        this.port = ((InetSocketAddress)ch.localAddress()).getPort();
        conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(this.port));
        this.SHUFFLE.setPort(this.port);
        LOG.info(this.getName() + " listening on port " + this.port);
        super.serviceStart();
        this.sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY, 61440);
        this.connectionKeepAliveEnabled = conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, false);
        this.connectionKeepAliveTimeOut = Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, 5));
        this.mapOutputMetaInfoCacheSize = Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE, 1000));
    }

    private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
        this.SHUFFLE = this.getShuffle(conf);
        if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY, false)) {
            LOG.info("Encrypted shuffle is enabled.");
            this.sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
            this.sslFactory.init();
        }
        ChannelInitializer<NioSocketChannel> channelInitializer = new ChannelInitializer<NioSocketChannel>(){

            @Override
            public void initChannel(NioSocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (ShuffleHandler.this.sslFactory != null) {
                    pipeline.addLast("ssl", (ChannelHandler)new SslHandler(ShuffleHandler.this.sslFactory.createSSLEngine()));
                }
                if (LOG.isDebugEnabled()) {
                    pipeline.addLast("loggingHandler", (ChannelHandler)new LoggingHandler(LogLevel.DEBUG));
                }
                pipeline.addLast("decoder", (ChannelHandler)new HttpRequestDecoder());
                pipeline.addLast("aggregator", (ChannelHandler)new HttpObjectAggregator(65536));
                pipeline.addLast("encoder", (ChannelHandler)new HttpResponseEncoder());
                pipeline.addLast("chunking", (ChannelHandler)new ChunkedWriteHandler());
                pipeline.addLast("shuffle", (ChannelHandler)ShuffleHandler.this.SHUFFLE);
                pipeline.addLast("idle", (ChannelHandler)new IdleStateHandler(0, ShuffleHandler.this.connectionKeepAliveTimeOut, 0));
                pipeline.addLast(ShuffleHandler.TIMEOUT_HANDLER, (ChannelHandler)new TimeoutHandler());
            }
        };
        bootstrap.childHandler(channelInitializer);
    }

    private void destroyPipeline() {
        if (this.sslFactory != null) {
            this.sslFactory.destroy();
        }
    }

    protected void serviceStop() throws Exception {
        this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
        if (this.bossGroup != null) {
            this.bossGroup.shutdownGracefully();
        }
        if (this.workerGroup != null) {
            this.workerGroup.shutdownGracefully();
        }
        this.destroyPipeline();
        if (this.stateDb != null) {
            this.stateDb.close();
        }
        super.serviceStop();
    }

    public synchronized ByteBuffer getMetaData() {
        try {
            return ShuffleHandler.serializeMetaData(this.port);
        }
        catch (IOException e) {
            LOG.error("Error during getMeta", (Throwable)e);
            return null;
        }
    }

    protected Shuffle getShuffle(Configuration conf) {
        return new Shuffle(conf);
    }

    protected JobTokenSecretManager getSecretManager() {
        return this.secretManager;
    }

    private void recoverState(Configuration conf) throws IOException {
        Path recoveryRoot = this.getRecoveryPath();
        if (recoveryRoot != null) {
            this.startStore(recoveryRoot);
            Pattern jobPattern = Pattern.compile("job_[0-9]+_[0-9]+");
            try (LeveldbIterator iter = null;){
                iter = new LeveldbIterator(this.stateDb);
                iter.seek(JniDBFactory.bytes((String)"job"));
                while (iter.hasNext()) {
                    Map.Entry entry = iter.next();
                    String key = JniDBFactory.asString((byte[])((byte[])entry.getKey()));
                    if (!jobPattern.matcher(key).matches()) {
                        break;
                    }
                    this.recoverJobShuffleInfo(key, (byte[])entry.getValue());
                }
            }
        }
    }

    private void startStore(Path recoveryRoot) throws IOException {
        Options options = new Options();
        options.createIfMissing(false);
        options.logger((Logger)new LevelDBLogger());
        Path dbPath = new Path(recoveryRoot, STATE_DB_NAME);
        LOG.info("Using state database at " + dbPath + " for recovery");
        File dbfile = new File(dbPath.toString());
        try {
            this.stateDb = JniDBFactory.factory.open(dbfile, options);
        }
        catch (NativeDB.DBException e) {
            if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
                LOG.info("Creating state database at " + dbfile);
                options.createIfMissing(true);
                try {
                    this.stateDb = JniDBFactory.factory.open(dbfile, options);
                    this.storeVersion();
                }
                catch (DBException dbExc) {
                    throw new IOException("Unable to create state store", dbExc);
                }
            }
            throw e;
        }
        this.checkVersion();
    }

    @VisibleForTesting
    Version loadVersion() throws IOException {
        byte[] data = this.stateDb.get(JniDBFactory.bytes((String)STATE_DB_SCHEMA_VERSION_KEY));
        if (data == null || data.length == 0) {
            return this.getCurrentVersion();
        }
        VersionPBImpl version = new VersionPBImpl(YarnServerCommonProtos.VersionProto.parseFrom((byte[])data));
        return version;
    }

    private void storeSchemaVersion(Version version) throws IOException {
        String key = STATE_DB_SCHEMA_VERSION_KEY;
        byte[] data = ((VersionPBImpl)version).getProto().toByteArray();
        try {
            this.stateDb.put(JniDBFactory.bytes((String)key), data);
        }
        catch (DBException e) {
            throw new IOException(e.getMessage(), e);
        }
    }

    private void storeVersion() throws IOException {
        this.storeSchemaVersion(CURRENT_VERSION_INFO);
    }

    @VisibleForTesting
    void storeVersion(Version version) throws IOException {
        this.storeSchemaVersion(version);
    }

    protected Version getCurrentVersion() {
        return CURRENT_VERSION_INFO;
    }

    private void checkVersion() throws IOException {
        Version loadedVersion = this.loadVersion();
        LOG.info("Loaded state DB schema version info " + loadedVersion);
        if (loadedVersion.equals((Object)this.getCurrentVersion())) {
            return;
        }
        if (!loadedVersion.isCompatibleTo(this.getCurrentVersion())) {
            throw new IOException("Incompatible version for state DB schema: expecting DB schema version " + this.getCurrentVersion() + ", but loading version " + loadedVersion);
        }
        LOG.info("Storing state DB schedma version info " + this.getCurrentVersion());
        this.storeVersion();
    }

    private void addJobToken(JobID jobId, String user, Token<JobTokenIdentifier> jobToken) {
        this.userRsrc.put(jobId.toString(), user);
        this.getSecretManager().addTokenForJob(jobId.toString(), jobToken);
        LOG.info("Added token for " + jobId.toString());
    }

    private void recoverJobShuffleInfo(String jobIdStr, byte[] data) throws IOException {
        JobID jobId;
        try {
            jobId = JobID.forName((String)jobIdStr);
        }
        catch (IllegalArgumentException e) {
            throw new IOException("Bad job ID " + jobIdStr + " in state store", e);
        }
        ShuffleHandlerRecoveryProtos.JobShuffleInfoProto proto = ShuffleHandlerRecoveryProtos.JobShuffleInfoProto.parseFrom((byte[])data);
        String user = proto.getUser();
        SecurityProtos.TokenProto tokenProto = proto.getJobToken();
        Token jobToken = new Token(tokenProto.getIdentifier().toByteArray(), tokenProto.getPassword().toByteArray(), new Text(tokenProto.getKind()), new Text(tokenProto.getService()));
        this.addJobToken(jobId, user, (Token<JobTokenIdentifier>)jobToken);
    }

    private void recordJobShuffleInfo(JobID jobId, String user, Token<JobTokenIdentifier> jobToken) throws IOException {
        if (this.stateDb != null) {
            SecurityProtos.TokenProto.Builder builder = SecurityProtos.TokenProto.newBuilder();
            SecurityProtos.TokenProto.getDefaultInstance().getIdentifier();
            SecurityProtos.TokenProto.Builder builder2 = builder.setIdentifier(ByteString.copyFrom((byte[])jobToken.getIdentifier()));
            SecurityProtos.TokenProto.getDefaultInstance().getPassword();
            SecurityProtos.TokenProto tokenProto = builder2.setPassword(ByteString.copyFrom((byte[])jobToken.getPassword())).setKind(jobToken.getKind().toString()).setService(jobToken.getService().toString()).build();
            ShuffleHandlerRecoveryProtos.JobShuffleInfoProto proto = ShuffleHandlerRecoveryProtos.JobShuffleInfoProto.newBuilder().setUser(user).setJobToken(tokenProto).build();
            try {
                this.stateDb.put(JniDBFactory.bytes((String)jobId.toString()), proto.toByteArray());
            }
            catch (DBException e) {
                throw new IOException("Error storing " + jobId, e);
            }
        }
        this.addJobToken(jobId, user, jobToken);
    }

    private void removeJobShuffleInfo(JobID jobId) throws IOException {
        String jobIdStr = jobId.toString();
        this.getSecretManager().removeTokenForJob(jobIdStr);
        this.userRsrc.remove(jobIdStr);
        if (this.stateDb != null) {
            try {
                this.stateDb.delete(JniDBFactory.bytes((String)jobIdStr));
            }
            catch (DBException e) {
                throw new IOException("Unable to remove " + jobId + " from state store", e);
            }
        }
    }

    @ChannelHandler.Sharable
    class Shuffle
    extends ChannelInboundHandlerAdapter {
        private static final int MAX_WEIGHT = 0xA00000;
        private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
        private static final int ALLOWED_CONCURRENCY = 16;
        private final Configuration conf;
        private final IndexCache indexCache;
        private int port;
        private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache = CacheBuilder.newBuilder().expireAfterAccess(5L, TimeUnit.MINUTES).softValues().concurrencyLevel(16).removalListener(new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>(){

            @Override
            public void onRemoval(RemovalNotification<AttemptPathIdentifier, AttemptPathInfo> notification) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("PathCache Eviction: " + notification.getKey() + ", Reason=" + (Object)((Object)notification.getCause()));
                }
            }
        }).maximumWeight(0xA00000L).weigher(new Weigher<AttemptPathIdentifier, AttemptPathInfo>(){

            @Override
            public int weigh(AttemptPathIdentifier key, AttemptPathInfo value) {
                return key.jobId.length() + key.user.length() + key.attemptId.length() + value.indexPath.toString().length() + value.dataPath.toString().length();
            }
        }).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>(){

            @Override
            public AttemptPathInfo load(AttemptPathIdentifier key) throws Exception {
                String base = Shuffle.this.getBaseLocation(key.jobId, key.dagId, key.user);
                String attemptBase = base + key.attemptId;
                Path indexFileName = ShuffleHandler.this.getAuxiliaryLocalPathHandler().getLocalPathForRead(attemptBase + "/" + ShuffleHandler.INDEX_FILE_NAME);
                Path mapOutputFileName = ShuffleHandler.this.getAuxiliaryLocalPathHandler().getLocalPathForRead(attemptBase + "/" + ShuffleHandler.DATA_FILE_NAME);
                LOG.debug("Loaded : {} via loader", (Object)key);
                return new AttemptPathInfo(indexFileName, mapOutputFileName);
            }
        });

        public Shuffle(Configuration conf) {
            this.conf = conf;
            this.indexCache = new IndexCache(conf);
            this.port = conf.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 13563);
        }

        public void setPort(int port) {
            this.port = port;
        }

        private List<String> splitMaps(List<String> mapq) {
            if (null == mapq) {
                return null;
            }
            ArrayList<String> ret = new ArrayList<String>();
            for (String s : mapq) {
                Collections.addAll(ret, s.split(","));
            }
            return ret;
        }

        private Range splitReduces(List<String> reduceq) {
            int first;
            if (null == reduceq || reduceq.size() != 1) {
                return null;
            }
            String[] reduce = reduceq.get(0).split("-");
            int last = first = Integer.parseInt(reduce[0]);
            if (reduce.length > 1) {
                last = Integer.parseInt(reduce[1]);
            }
            return new Range(first, last);
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            if (ShuffleHandler.this.maxShuffleConnections > 0 && ShuffleHandler.this.accepted.size() >= ShuffleHandler.this.maxShuffleConnections) {
                LOG.info(String.format("Current number of shuffle connections (%d) is greater than or equal to the max allowed shuffle connections (%d)", ShuffleHandler.this.accepted.size(), ShuffleHandler.this.maxShuffleConnections));
                ctx.channel().close();
                return;
            }
            ShuffleHandler.this.accepted.add(ctx.channel());
            super.channelActive(ctx);
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
            HttpRequest request = (HttpRequest)message;
            this.handleRequest(ctx, request);
        }

        private void handleRequest(ChannelHandlerContext ctx, HttpRequest request) throws IOException, Exception {
            String dagId;
            String jobId;
            if (request.getMethod() != HttpMethod.GET) {
                this.sendError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            if (!"mapreduce".equals(request.headers().get("name")) || !"1.0.0".equals(request.headers().get("version"))) {
                this.sendError(ctx, "Incompatible shuffle request version", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
            List<String> keepAliveList = q.get("keepAlive");
            List<String> dagCompletedQ = q.get("dagAction");
            List<String> vertexCompletedQ = q.get("vertexAction");
            List<String> taskAttemptFailedQ = q.get("taskAttemptAction");
            boolean keepAliveParam = false;
            if (keepAliveList != null && keepAliveList.size() == 1) {
                keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
                LOG.debug("KeepAliveParam : {} : {}", keepAliveList, (Object)keepAliveParam);
            }
            List<String> mapIds = this.splitMaps(q.get("map"));
            Range reduceRange = this.splitReduces(q.get("reduce"));
            List<String> jobQ = q.get("job");
            List<String> dagIdQ = q.get("dag");
            List<String> vertexIdQ = q.get("vertex");
            if (LOG.isDebugEnabled()) {
                LOG.debug("RECV: " + request.getUri() + "\n  mapId: " + mapIds + "\n  reduceId: " + reduceRange + "\n  jobId: " + jobQ + "\n  dagId: " + dagIdQ + "\n  keepAlive: " + keepAliveParam);
            }
            if (this.deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
                return;
            }
            if (this.deleteVertexDirectories(ctx.channel(), vertexCompletedQ, jobQ, dagIdQ, vertexIdQ)) {
                return;
            }
            if (this.deleteTaskAttemptDirectories(ctx.channel(), taskAttemptFailedQ, jobQ, dagIdQ, mapIds)) {
                return;
            }
            if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
                this.sendError(ctx, "Required param job, dag, map and reduce", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (jobQ.size() != 1) {
                this.sendError(ctx, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (AUDITLOG.isDebugEnabled()) {
                AUDITLOG.debug("shuffle for " + jobQ.get(0) + " mapper: " + mapIds + " reducer: " + reduceRange);
            }
            try {
                jobId = jobQ.get(0);
                dagId = dagIdQ.get(0);
            }
            catch (NumberFormatException e) {
                this.sendError(ctx, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            catch (IllegalArgumentException e) {
                this.sendError(ctx, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            String reqUri = request.getUri();
            if (null == reqUri) {
                this.sendError(ctx, HttpResponseStatus.FORBIDDEN);
                return;
            }
            DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            try {
                this.verifyRequest(jobId, ctx, request, response, new URL("http", "", this.port, reqUri));
            }
            catch (IOException e) {
                LOG.warn("Shuffle failure ", (Throwable)e);
                this.sendError(ctx, e.getMessage(), HttpResponseStatus.UNAUTHORIZED);
                return;
            }
            HashMap<String, MapOutputInfo> mapOutputInfoMap = new HashMap<String, MapOutputInfo>();
            Channel ch = ctx.channel();
            ChannelPipeline pipeline = ch.pipeline();
            TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(ShuffleHandler.TIMEOUT_HANDLER);
            timeoutHandler.setEnabledTimeout(false);
            String user = (String)ShuffleHandler.this.userRsrc.get(jobId);
            try {
                this.populateHeaders(mapIds, jobId, dagId, user, reduceRange, response, keepAliveParam, mapOutputInfoMap);
            }
            catch (DiskChecker.DiskErrorException e) {
                LOG.error("Shuffle error in populating headers (fatal: DiskErrorException):", (Throwable)e);
                String errorMessage = this.getErrorMessage(e);
                this.sendFakeShuffleHeaderWithError(ctx, (Object)((Object)ShuffleHandlerError.DISK_ERROR_EXCEPTION) + ": " + errorMessage, response);
                return;
            }
            catch (IOException e) {
                ch.write(response);
                LOG.error("Shuffle error in populating headers :", (Throwable)e);
                String errorMessage = this.getErrorMessage(e);
                this.sendError(ctx, errorMessage, HttpResponseStatus.INTERNAL_SERVER_ERROR);
                return;
            }
            ch.write(response);
            boolean keepAlive = keepAliveParam || ShuffleHandler.this.connectionKeepAliveEnabled;
            ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx, user, mapOutputInfoMap, jobId, dagId, keepAlive);
            for (int i = 0; i < Math.min(ShuffleHandler.this.maxSessionOpenFiles, mapIds.size()); ++i) {
                ChannelFuture nextMap = this.sendMap(reduceContext);
                if (nextMap != null) continue;
                return;
            }
        }

        private boolean isNullOrEmpty(List<String> entries) {
            return entries == null || entries.isEmpty();
        }

        private boolean notEmptyAndContains(List<String> entries, String key) {
            if (entries == null || entries.isEmpty()) {
                return false;
            }
            return entries.get(0).contains(key);
        }

        private boolean deleteDagDirectories(Channel channel, List<String> dagCompletedQ, List<String> jobQ, List<String> dagIdQ) {
            if (jobQ == null || jobQ.isEmpty()) {
                return false;
            }
            if (this.notEmptyAndContains(dagCompletedQ, "delete") && !this.isNullOrEmpty(dagIdQ)) {
                String base = this.getDagLocation(jobQ.get(0), dagIdQ.get(0), (String)ShuffleHandler.this.userRsrc.get(jobQ.get(0)));
                try {
                    FileContext lfc = FileContext.getLocalFSFileContext();
                    for (Path dagPath : ShuffleHandler.this.getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(base)) {
                        lfc.delete(dagPath, true);
                    }
                }
                catch (IOException e) {
                    LOG.warn("Encountered exception during dag delete " + e);
                }
                channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)).addListener(ChannelFutureListener.CLOSE);
                return true;
            }
            return false;
        }

        private boolean deleteVertexDirectories(Channel channel, List<String> vertexCompletedQ, List<String> jobQ, List<String> dagIdQ, List<String> vertexIdQ) {
            if (jobQ == null || jobQ.isEmpty()) {
                return false;
            }
            if (this.notEmptyAndContains(vertexCompletedQ, "delete") && !this.isNullOrEmpty(vertexIdQ)) {
                try {
                    this.deleteTaskDirsOfVertex(jobQ.get(0), dagIdQ.get(0), vertexIdQ.get(0), (String)ShuffleHandler.this.userRsrc.get(jobQ.get(0)));
                }
                catch (IOException e) {
                    LOG.warn("Encountered exception during vertex delete " + e);
                }
                channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)).addListener(ChannelFutureListener.CLOSE);
                return true;
            }
            return false;
        }

        private boolean deleteTaskAttemptDirectories(Channel channel, List<String> taskAttemptFailedQ, List<String> jobQ, List<String> dagIdQ, List<String> taskAttemptIdQ) {
            if (jobQ == null || jobQ.isEmpty()) {
                return false;
            }
            if (this.notEmptyAndContains(taskAttemptFailedQ, "delete") && !this.isNullOrEmpty(taskAttemptIdQ)) {
                for (String taskAttemptId : taskAttemptIdQ) {
                    String baseStr = this.getBaseLocation(jobQ.get(0), dagIdQ.get(0), (String)ShuffleHandler.this.userRsrc.get(jobQ.get(0)));
                    try {
                        FileSystem fs = FileSystem.getLocal((Configuration)this.conf).getRaw();
                        block3: for (Path basePath : ShuffleHandler.this.getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) {
                            for (FileStatus fileStatus : fs.listStatus(basePath)) {
                                Path taskAttemptPath = fileStatus.getPath();
                                if (!taskAttemptPath.getName().startsWith(taskAttemptId) || !fs.delete(taskAttemptPath, true)) continue;
                                LOG.info("Deleted directory : " + taskAttemptPath);
                                this.indexCache.removeMap(taskAttemptPath.getName());
                                continue block3;
                            }
                        }
                    }
                    catch (IOException e) {
                        LOG.warn("Encountered exception during failed task attempt delete " + e);
                    }
                }
                channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)).addListener(ChannelFutureListener.CLOSE);
                return true;
            }
            return false;
        }

        public ChannelFuture sendMap(ReduceContext reduceContext) throws Exception {
            ChannelFuture nextMap = null;
            if (reduceContext.getMapsToSend().get() < reduceContext.getMapIds().size()) {
                int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
                String mapId = reduceContext.getMapIds().get(nextIndex);
                try {
                    MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
                    if (info == null) {
                        info = this.getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(), reduceContext.getJobId(), reduceContext.getUser());
                    }
                    if (null == (nextMap = this.sendMapOutput(reduceContext.getCtx(), reduceContext.getCtx().channel(), reduceContext.getUser(), mapId, reduceContext.getReduceRange(), info))) {
                        this.sendError(reduceContext.getCtx(), HttpResponseStatus.NOT_FOUND);
                        return null;
                    }
                    nextMap.addListener(new ReduceMapFileCount(reduceContext));
                }
                catch (IOException e) {
                    if (e instanceof DiskChecker.DiskErrorException) {
                        LOG.error("Shuffle error :" + e);
                    } else {
                        LOG.error("Shuffle error :", (Throwable)e);
                    }
                    String errorMessage = this.getErrorMessage(e);
                    this.sendError(reduceContext.getCtx(), errorMessage, HttpResponseStatus.INTERNAL_SERVER_ERROR);
                    return null;
                }
            }
            return nextMap;
        }

        private String getErrorMessage(Throwable t) {
            StringBuffer sb = new StringBuffer(t.getMessage());
            while (t.getCause() != null) {
                sb.append(t.getCause().getMessage());
                t = t.getCause();
            }
            return sb.toString();
        }

        private String getBaseLocation(String jobId, String dagId, String user) {
            String baseStr = this.getDagLocation(jobId, dagId, user) + "output" + "/";
            return baseStr;
        }

        private void deleteTaskDirsOfVertex(String jobId, String dagId, String vertexId, String user) throws IOException {
            String baseStr = this.getBaseLocation(jobId, dagId, user);
            FileContext lfc = FileContext.getLocalFSFileContext();
            for (Path dagPath : ShuffleHandler.this.getAuxiliaryLocalPathHandler().getAllLocalPathsForRead(baseStr)) {
                RemoteIterator status = lfc.listStatus(dagPath);
                JobID jobID = JobID.forName((String)jobId);
                String taskDirPrefix = String.format("attempt%s_%s_%s_", jobID.toString().replace("job", ""), dagId, vertexId);
                while (status.hasNext()) {
                    FileStatus fileStatus = (FileStatus)status.next();
                    Path attemptPath = fileStatus.getPath();
                    if (!attemptPath.getName().startsWith(taskDirPrefix) || !lfc.delete(attemptPath, true)) continue;
                    LOG.debug("deleted shuffle data in task directory: {}", (Object)attemptPath);
                }
            }
        }

        private String getDagLocation(String jobId, String dagId, String user) {
            JobID jobID = JobID.forName((String)jobId);
            ApplicationId appID = ApplicationId.newInstance((long)Long.parseLong(jobID.getJtIdentifier()), (int)jobID.getId());
            String dagStr = "usercache/" + user + "/" + ShuffleHandler.APPCACHE + "/" + appID.toString() + "/" + "dag_" + dagId + "/";
            return dagStr;
        }

        protected MapOutputInfo getMapOutputInfo(String dagId, String mapId, Range reduceRange, String jobId, String user) throws IOException {
            AttemptPathInfo pathInfo;
            try {
                AttemptPathIdentifier identifier = new AttemptPathIdentifier(jobId, dagId, user, mapId);
                pathInfo = this.pathCache.get(identifier);
                LOG.debug("Retrieved pathInfo for {} check for corresponding loaded messages to determine whether it was loaded or cached", (Object)identifier);
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof IOException) {
                    throw (IOException)e.getCause();
                }
                throw new RuntimeException(e.getCause());
            }
            TezSpillRecord spillRecord = this.indexCache.getSpillRecord(mapId, pathInfo.indexPath, user);
            if (LOG.isDebugEnabled()) {
                LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId + ",dataFile=" + pathInfo.dataPath + ", indexFile=" + pathInfo.indexPath);
            }
            MapOutputInfo outputInfo = reduceRange.first == reduceRange.last ? new MapOutputInfo(pathInfo.dataPath, spillRecord.getIndex(reduceRange.first), reduceRange) : new MapOutputInfo(pathInfo.dataPath, spillRecord, reduceRange);
            return outputInfo;
        }

        protected void populateHeaders(List<String> mapIds, String jobId, String dagId, String user, Range reduceRange, HttpResponse response, boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
            long contentLength = 0L;
            if (ShuffleHandler.this.connectionKeepAliveEnabled || keepAliveParam) {
                contentLength = this.getContentLength(mapIds, jobId, dagId, user, reduceRange, mapOutputInfoMap);
            }
            this.setResponseHeaders(response, keepAliveParam, contentLength);
        }

        long getContentLength(List<String> mapIds, String jobId, String dagId, String user, Range reduceRange, Map<String, MapOutputInfo> mapOutputInfoMap) throws IOException {
            long contentLength = 0L;
            int reduceCountVSize = WritableUtils.getVIntSize((long)(reduceRange.getLast() - reduceRange.getFirst() + 1));
            for (String mapId : mapIds) {
                contentLength += (long)reduceCountVSize;
                MapOutputInfo outputInfo = this.getMapOutputInfo(dagId, mapId, reduceRange, jobId, user);
                if (mapOutputInfoMap.size() < ShuffleHandler.this.mapOutputMetaInfoCacheSize) {
                    mapOutputInfoMap.put(mapId, outputInfo);
                }
                for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); ++reduce) {
                    TezIndexRecord indexRecord = outputInfo.getIndex(reduce);
                    ShuffleHeader header = new ShuffleHeader(mapId, indexRecord.getPartLength(), indexRecord.getRawLength(), reduce);
                    contentLength += (long)header.writeLength();
                    contentLength += indexRecord.getPartLength();
                }
            }
            return contentLength;
        }

        protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
            if (ShuffleHandler.this.connectionKeepAliveEnabled || keepAliveParam) {
                response.headers().set("Content-Length", (Object)String.valueOf(contentLength));
                response.headers().set("Connection", (Object)"keep-alive");
                response.headers().set("keep-alive", (Object)("timeout=" + ShuffleHandler.this.connectionKeepAliveTimeOut));
                LOG.debug("Content Length in shuffle : {}", (Object)contentLength);
            } else {
                LOG.debug("Setting connection close header...");
                response.headers().set("Connection", (Object)ShuffleHandler.CONNECTION_CLOSE);
            }
        }

        protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request, HttpResponse response, URL requestUri) throws IOException {
            SecretKey tokenSecret = ShuffleHandler.this.getSecretManager().retrieveTokenSecret(appid);
            if (null == tokenSecret) {
                LOG.info("Request for unknown token " + appid);
                throw new IOException("could not find jobid");
            }
            String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
            String urlHashStr = request.headers().get("UrlHash");
            if (urlHashStr == null) {
                LOG.info("Missing header hash for " + appid);
                throw new IOException("fetcher cannot be authenticated");
            }
            if (LOG.isDebugEnabled()) {
                int len = urlHashStr.length();
                LOG.debug("verifying request. enc_str=" + enc_str + "; hash=..." + urlHashStr.substring(len - len / 2, len - 1));
            }
            SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret);
            String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8), tokenSecret);
            response.headers().set("ReplyHash", (Object)reply);
            response.headers().set("name", (Object)"mapreduce");
            response.headers().set("version", (Object)"1.0.0");
            if (LOG.isDebugEnabled()) {
                int len = reply.length();
                LOG.debug("Fetcher request verfied. enc_str=" + enc_str + ";reply=" + reply.substring(len - len / 2, len - 1));
            }
        }

        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch, String user, String mapId, Range reduceRange, MapOutputInfo outputInfo) throws IOException {
            ChannelFuture writeFuture;
            RandomAccessFile spill;
            TezIndexRecord firstIndex = null;
            TezIndexRecord lastIndex = null;
            DataOutputBuffer dobRange = new DataOutputBuffer();
            WritableUtils.writeVInt((DataOutput)dobRange, (int)(reduceRange.getLast() - reduceRange.getFirst() + 1));
            ch.writeAndFlush(Unpooled.wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
            for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); ++reduce) {
                TezIndexRecord index = outputInfo.getIndex(reduce);
                if (index.getPartLength() != 0L) {
                    if (firstIndex == null) {
                        firstIndex = index;
                    }
                    lastIndex = index;
                }
                ShuffleHeader header = new ShuffleHeader(mapId, index.getPartLength(), index.getRawLength(), reduce);
                DataOutputBuffer dob = new DataOutputBuffer();
                header.write((DataOutput)dob);
                ch.writeAndFlush(Unpooled.wrappedBuffer(dob.getData(), 0, dob.getLength()));
            }
            outputInfo.finish();
            long rangeOffset = firstIndex.getStartOffset();
            long rangePartLength = lastIndex.getStartOffset() + lastIndex.getPartLength() - firstIndex.getStartOffset();
            File spillFile = new File(outputInfo.mapOutputFileName.toString());
            try {
                spill = SecureIOUtils.openForRandomRead((File)spillFile, (String)"r", (String)user, null);
            }
            catch (FileNotFoundException e) {
                LOG.info(spillFile + " not found");
                return null;
            }
            if (ch.pipeline().get(SslHandler.class) == null) {
                FadvisedFileRegion partition = new FadvisedFileRegion(spill, rangeOffset, rangePartLength, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, spillFile.getAbsolutePath(), ShuffleHandler.this.shuffleBufferSize, ShuffleHandler.this.shuffleTransferToAllowed);
                writeFuture = ch.writeAndFlush(partition);
            } else {
                FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill, rangeOffset, rangePartLength, ShuffleHandler.this.sslFileBufferSize, ShuffleHandler.this.manageOsCache, ShuffleHandler.this.readaheadLength, ShuffleHandler.this.readaheadPool, spillFile.getAbsolutePath());
                writeFuture = ch.writeAndFlush(chunk);
            }
            ShuffleHandler.this.metrics.shuffleConnections.incr();
            ShuffleHandler.this.metrics.shuffleOutputBytes.incr(rangePartLength);
            return writeFuture;
        }

        protected void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
            this.sendError(ctx, "", status);
        }

        protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status);
            this.sendError(ctx, message, (FullHttpResponse)response);
        }

        protected void sendError(ChannelHandlerContext ctx, String message, FullHttpResponse response) {
            this.sendError(ctx, Unpooled.copiedBuffer(message, CharsetUtil.UTF_8), response);
        }

        private void sendFakeShuffleHeaderWithError(ChannelHandlerContext ctx, String message, HttpResponse response) throws IOException {
            DefaultFullHttpResponse fullResponse = new DefaultFullHttpResponse(response.getProtocolVersion(), response.getStatus());
            fullResponse.headers().set(response.headers());
            ShuffleHeader header = new ShuffleHeader(message, -1L, -1L, -1);
            DataOutputBuffer out = new DataOutputBuffer();
            header.write((DataOutput)out);
            this.sendError(ctx, Unpooled.wrappedBuffer(out.getData(), 0, out.getLength()), (FullHttpResponse)fullResponse);
        }

        protected void sendError(ChannelHandlerContext ctx, ByteBuf content, FullHttpResponse response) {
            response.headers().set("Content-Type", (Object)"text/plain; charset=UTF-8");
            response.headers().set("name", (Object)"mapreduce");
            response.headers().set("version", (Object)"1.0.0");
            response.content().writeBytes(content);
            ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
            content.release();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            if (cause instanceof TooLongFrameException) {
                this.sendError(ctx, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (cause instanceof IOException) {
                if (cause instanceof ClosedChannelException) {
                    LOG.debug("Ignoring closed channel error", cause);
                    return;
                }
                String message = String.valueOf(cause.getMessage());
                if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
                    LOG.debug("Ignoring client socket close", cause);
                    return;
                }
            }
            LOG.error("Shuffle error: ", cause);
            if (ctx.channel().isActive()) {
                LOG.error("Shuffle error", cause);
                this.sendError(ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }

        class MapOutputInfo {
            private final Path mapOutputFileName;
            private TezSpillRecord spillRecord;
            private TezIndexRecord indexRecord;
            private final Range reduceRange;

            MapOutputInfo(Path mapOutputFileName, TezIndexRecord indexRecord, Range reduceRange) {
                this.mapOutputFileName = mapOutputFileName;
                this.indexRecord = indexRecord;
                this.reduceRange = reduceRange;
            }

            MapOutputInfo(Path mapOutputFileName, TezSpillRecord spillRecord, Range reduceRange) {
                this.mapOutputFileName = mapOutputFileName;
                this.spillRecord = spillRecord;
                this.reduceRange = reduceRange;
            }

            TezIndexRecord getIndex(int index) {
                if (index < this.reduceRange.first || index > this.reduceRange.last) {
                    throw new IllegalArgumentException("Reduce Index: " + index + " out of range for " + this.mapOutputFileName);
                }
                if (this.spillRecord != null) {
                    return this.spillRecord.getIndex(index);
                }
                return this.indexRecord;
            }

            public void finish() {
                this.spillRecord = null;
                this.indexRecord = null;
            }
        }
    }

    @Metrics(about="Shuffle output metrics", context="mapred", name="tez")
    static class ShuffleMetrics
    implements ChannelFutureListener {
        @Metric(value={"Shuffle output in bytes"})
        MutableCounterLong shuffleOutputBytes;
        @Metric(value={"# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;
        @Metric(value={"# of succeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;
        @Metric(value={"# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }

    private static class LevelDBLogger
    implements Logger {
        private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(LevelDBLogger.class);

        private LevelDBLogger() {
        }

        public void log(String message) {
            LOG.info(message);
        }
    }

    static class AttemptPathIdentifier {
        private final String jobId;
        private final String dagId;
        private final String user;
        private final String attemptId;

        public AttemptPathIdentifier(String jobId, String dagID, String user, String attemptId) {
            this.jobId = jobId;
            this.dagId = dagID;
            this.user = user;
            this.attemptId = attemptId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            AttemptPathIdentifier that = (AttemptPathIdentifier)o;
            if (!this.attemptId.equals(that.attemptId)) {
                return false;
            }
            return this.jobId.equals(that.jobId);
        }

        public int hashCode() {
            int result = this.jobId.hashCode();
            result = 31 * result + this.attemptId.hashCode();
            return result;
        }

        public String toString() {
            return "AttemptPathIdentifier{jobId='" + this.jobId + '\'' + ", dagId='" + this.dagId + '\'' + ", user='" + this.user + '\'' + ", attemptId='" + this.attemptId + '\'' + '}';
        }
    }

    static class AttemptPathInfo {
        private final Path indexPath;
        private final Path dataPath;

        public AttemptPathInfo(Path indexPath, Path dataPath) {
            this.indexPath = indexPath;
            this.dataPath = dataPath;
        }
    }

    protected static class Range {
        final int first;
        final int last;

        Range(int first, int last) {
            this.first = first;
            this.last = last;
        }

        int getFirst() {
            return this.first;
        }

        int getLast() {
            return this.last;
        }

        public String toString() {
            return "range: " + this.first + "-" + this.last;
        }
    }

    static class TimeoutHandler
    extends ChannelDuplexHandler {
        private boolean enabledTimeout;

        TimeoutHandler() {
        }

        void setEnabledTimeout(boolean enabledTimeout) {
            this.enabledTimeout = enabledTimeout;
        }

        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            IdleStateEvent e;
            if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.WRITER_IDLE && this.enabledTimeout) {
                ctx.channel().close();
            }
        }
    }

    private static class ReduceContext {
        private List<String> mapIds;
        private AtomicInteger mapsToWait;
        private AtomicInteger mapsToSend;
        private Range reduceRange;
        private ChannelHandlerContext ctx;
        private String user;
        private Map<String, Shuffle.MapOutputInfo> infoMap;
        private String jobId;
        private String dagId;
        private final boolean keepAlive;

        public ReduceContext(List<String> mapIds, Range reduceRange, ChannelHandlerContext context, String usr, Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap, String jobId, String dagId, boolean keepAlive) {
            this.mapIds = mapIds;
            this.reduceRange = reduceRange;
            this.dagId = dagId;
            this.mapsToWait = new AtomicInteger(mapIds.size());
            this.mapsToSend = new AtomicInteger(0);
            this.ctx = context;
            this.user = usr;
            this.infoMap = mapOutputInfoMap;
            this.jobId = jobId;
            this.keepAlive = keepAlive;
        }

        public Range getReduceRange() {
            return this.reduceRange;
        }

        public ChannelHandlerContext getCtx() {
            return this.ctx;
        }

        public String getUser() {
            return this.user;
        }

        public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
            return this.infoMap;
        }

        public String getJobId() {
            return this.jobId;
        }

        public List<String> getMapIds() {
            return this.mapIds;
        }

        public AtomicInteger getMapsToSend() {
            return this.mapsToSend;
        }

        public AtomicInteger getMapsToWait() {
            return this.mapsToWait;
        }

        public boolean getKeepAlive() {
            return this.keepAlive;
        }
    }

    class ReduceMapFileCount
    implements ChannelFutureListener {
        private ReduceContext reduceContext;

        public ReduceMapFileCount(ReduceContext rc) {
            this.reduceContext = rc;
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Channel ch = future.channel();
            if (!future.isSuccess()) {
                ch.close();
                return;
            }
            int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
            if (waitCount == 0) {
                LOG.debug("Finished with all map outputs");
                ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
                ShuffleHandler.this.metrics.operationComplete(future);
                if (this.reduceContext.getKeepAlive()) {
                    ChannelPipeline pipeline = ch.pipeline();
                    TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(ShuffleHandler.TIMEOUT_HANDLER);
                    timeoutHandler.setEnabledTimeout(true);
                } else {
                    ch.close();
                }
            } else {
                ShuffleHandler.this.SHUFFLE.sendMap(this.reduceContext);
            }
        }
    }
}

