/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.thirdparty.com.google.common.base.Splitter;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayCommand;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayMapper;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.CountTimeWritable;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.UserCommandKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditReplayThread
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(AuditReplayThread.class);
    private DelayQueue<AuditReplayCommand> commandQueue;
    private ConcurrentMap<String, FileSystem> fsCache;
    private URI namenodeUri;
    private UserGroupInformation loginUser;
    private Configuration mapperConf;
    private Exception exception;
    private long startTimestampMs;
    private boolean createBlocks;
    private Map<AuditReplayMapper.REPLAYCOUNTERS, Counter> replayCountersMap = new HashMap<AuditReplayMapper.REPLAYCOUNTERS, Counter>();
    private Map<String, Counter> individualCommandsMap = new HashMap<String, Counter>();
    private Map<UserCommandKey, CountTimeWritable> commandLatencyMap = new HashMap<UserCommandKey, CountTimeWritable>();

    AuditReplayThread(Mapper.Context mapperContext, DelayQueue<AuditReplayCommand> queue, ConcurrentMap<String, FileSystem> fsCache) throws IOException {
        this.commandQueue = queue;
        this.fsCache = fsCache;
        this.loginUser = UserGroupInformation.getLoginUser();
        this.mapperConf = mapperContext.getConfiguration();
        this.namenodeUri = URI.create(this.mapperConf.get("nn_uri"));
        this.startTimestampMs = this.mapperConf.getLong("start_timestamp_ms", -1L);
        this.createBlocks = this.mapperConf.getBoolean("auditreplay.create-blocks", true);
        LOG.info("Start timestamp: " + this.startTimestampMs);
        for (AuditReplayMapper.REPLAYCOUNTERS rEPLAYCOUNTERS : AuditReplayMapper.REPLAYCOUNTERS.values()) {
            this.replayCountersMap.put(rEPLAYCOUNTERS, (Counter)new GenericCounter());
        }
        for (Enum enum_ : AuditReplayMapper.ReplayCommand.values()) {
            this.individualCommandsMap.put((AuditReplayMapper.ReplayCommand)enum_ + "_COUNT", (Counter)new GenericCounter());
            this.individualCommandsMap.put((AuditReplayMapper.ReplayCommand)enum_ + "_LATENCY", (Counter)new GenericCounter());
            this.individualCommandsMap.put((AuditReplayMapper.ReplayCommand)enum_ + "_INVALID", (Counter)new GenericCounter());
        }
    }

    void drainCounters(Mapper.Context context) {
        for (Map.Entry<AuditReplayMapper.REPLAYCOUNTERS, Counter> entry : this.replayCountersMap.entrySet()) {
            context.getCounter((Enum)entry.getKey()).increment(entry.getValue().getValue());
        }
        for (Map.Entry<Object, Counter> entry : this.individualCommandsMap.entrySet()) {
            context.getCounter("INDIVIDUAL_COMMANDS", (String)entry.getKey()).increment(entry.getValue().getValue());
        }
    }

    void drainCommandLatencies(Mapper.Context context) throws InterruptedException, IOException {
        for (Map.Entry<UserCommandKey, CountTimeWritable> ent : this.commandLatencyMap.entrySet()) {
            context.write((Object)ent.getKey(), (Object)ent.getValue());
        }
    }

    void addToQueue(AuditReplayCommand cmd) {
        this.commandQueue.put(cmd);
    }

    Exception getException() {
        return this.exception;
    }

    @Override
    public void run() {
        long currentEpoch = System.currentTimeMillis();
        long delay = this.startTimestampMs - currentEpoch;
        try {
            if (delay > 0L) {
                LOG.info("Sleeping for " + delay + " ms");
                Thread.sleep(delay);
            } else {
                LOG.warn("Starting late by " + -1L * delay + " ms");
            }
            AuditReplayCommand cmd = (AuditReplayCommand)this.commandQueue.take();
            while (!cmd.isPoison()) {
                this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALCOMMANDS).increment(1L);
                delay = cmd.getDelay(TimeUnit.MILLISECONDS);
                if (delay < -5L) {
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.LATECOMMANDS).increment(1L);
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.LATECOMMANDSTOTALTIME).increment(-1L * delay);
                }
                if (!this.replayLog(cmd)) {
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).increment(1L);
                }
                cmd = (AuditReplayCommand)this.commandQueue.take();
            }
        }
        catch (InterruptedException e) {
            LOG.error("Interrupted; exiting from thread.", (Throwable)e);
        }
        catch (Exception e) {
            this.exception = e;
            LOG.error("ReplayThread encountered exception; exiting.", (Throwable)e);
        }
    }

    private boolean replayLog(AuditReplayCommand command) {
        AuditReplayMapper.ReplayCommand replayCommand;
        String src = command.getSrc();
        String dst = command.getDest();
        FileSystem proxyFs = (FileSystem)this.fsCache.get(command.getSimpleUgi());
        if (proxyFs == null) {
            UserGroupInformation ugi = UserGroupInformation.createProxyUser((String)command.getSimpleUgi(), (UserGroupInformation)this.loginUser);
            proxyFs = (FileSystem)ugi.doAs(() -> {
                try {
                    DistributedFileSystem fs = new DistributedFileSystem();
                    fs.initialize(this.namenodeUri, this.mapperConf);
                    return fs;
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            });
            this.fsCache.put(command.getSimpleUgi(), proxyFs);
        }
        FileSystem fs = proxyFs;
        try {
            replayCommand = AuditReplayMapper.ReplayCommand.valueOf(command.getCommand().split(" ")[0].toUpperCase());
        }
        catch (IllegalArgumentException iae) {
            LOG.warn("Unsupported/invalid command: " + command);
            this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALUNSUPPORTEDCOMMANDS).increment(1L);
            return false;
        }
        try {
            long startTime = System.currentTimeMillis();
            switch (replayCommand) {
                case CREATE: {
                    FSDataOutputStream fsDos = fs.create(new Path(src));
                    if (this.createBlocks) {
                        fsDos.writeByte(0);
                    }
                    fsDos.close();
                    break;
                }
                case GETFILEINFO: {
                    fs.getFileStatus(new Path(src));
                    break;
                }
                case CONTENTSUMMARY: {
                    fs.getContentSummary(new Path(src));
                    break;
                }
                case MKDIRS: {
                    fs.mkdirs(new Path(src));
                    break;
                }
                case RENAME: {
                    fs.rename(new Path(src), new Path(dst));
                    break;
                }
                case LISTSTATUS: {
                    ((DistributedFileSystem)fs).getClient().listPaths(src, HdfsFileStatus.EMPTY_NAME);
                    break;
                }
                case APPEND: {
                    fs.append(new Path(src));
                    return true;
                }
                case DELETE: {
                    fs.delete(new Path(src), true);
                    break;
                }
                case OPEN: {
                    fs.open(new Path(src)).close();
                    break;
                }
                case SETPERMISSION: {
                    fs.setPermission(new Path(src), FsPermission.getDefault());
                    break;
                }
                case SETOWNER: {
                    fs.setOwner(new Path(src), UserGroupInformation.getCurrentUser().getShortUserName(), UserGroupInformation.getCurrentUser().getPrimaryGroupName());
                    break;
                }
                case SETTIMES: {
                    fs.setTimes(new Path(src), System.currentTimeMillis(), System.currentTimeMillis());
                    break;
                }
                case SETREPLICATION: {
                    fs.setReplication(new Path(src), (short)1);
                    break;
                }
                case CONCAT: {
                    String bareDist = dst.length() < 2 ? "" : dst.substring(1, dst.length() - 1).trim();
                    ArrayList<Path> dsts = new ArrayList<Path>();
                    for (String s : Splitter.on((String)",").omitEmptyStrings().trimResults().split((CharSequence)bareDist)) {
                        dsts.add(new Path(s));
                    }
                    fs.concat(new Path(src), dsts.toArray(new Path[0]));
                    break;
                }
                default: {
                    throw new RuntimeException("Unexpected command: " + replayCommand);
                }
            }
            long latency = System.currentTimeMillis() - startTime;
            UserCommandKey userCommandKey = new UserCommandKey(command.getSimpleUgi(), replayCommand.toString(), replayCommand.getType().toString());
            this.commandLatencyMap.putIfAbsent(userCommandKey, new CountTimeWritable());
            CountTimeWritable latencyWritable = this.commandLatencyMap.get(userCommandKey);
            latencyWritable.setCount(latencyWritable.getCount() + 1L);
            latencyWritable.setTime(latencyWritable.getTime() + latency);
            switch (replayCommand.getType()) {
                case WRITE: {
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALWRITECOMMANDLATENCY).increment(latency);
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALWRITECOMMANDS).increment(1L);
                    break;
                }
                case READ: {
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALREADCOMMANDLATENCY).increment(latency);
                    this.replayCountersMap.get((Object)AuditReplayMapper.REPLAYCOUNTERS.TOTALREADCOMMANDS).increment(1L);
                    break;
                }
                default: {
                    throw new RuntimeException("Unexpected command type: " + replayCommand.getType());
                }
            }
            this.individualCommandsMap.get(replayCommand + "_LATENCY").increment(latency);
            this.individualCommandsMap.get(replayCommand + "_COUNT").increment(1L);
            return true;
        }
        catch (IOException e) {
            LOG.debug("IOException: " + e.getLocalizedMessage());
            this.individualCommandsMap.get(replayCommand + "_INVALID").increment(1L);
            return false;
        }
    }
}

