/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.dynamometer.workloadgenerator.audit;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.WorkloadMapper;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditCommandParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditLogDirectParser;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayCommand;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayReducer;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.AuditReplayThread;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.CountTimeWritable;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.NoSplitTextInputFormat;
import org.apache.hadoop.tools.dynamometer.workloadgenerator.audit.UserCommandKey;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AuditReplayMapper
extends WorkloadMapper<LongWritable, Text, UserCommandKey, CountTimeWritable> {
    public static final String INPUT_PATH_KEY = "auditreplay.input-path";
    public static final String OUTPUT_PATH_KEY = "auditreplay.output-path";
    public static final String NUM_THREADS_KEY = "auditreplay.num-threads";
    public static final int NUM_THREADS_DEFAULT = 1;
    public static final String CREATE_BLOCKS_KEY = "auditreplay.create-blocks";
    public static final boolean CREATE_BLOCKS_DEFAULT = true;
    public static final String RATE_FACTOR_KEY = "auditreplay.rate-factor";
    public static final double RATE_FACTOR_DEFAULT = 1.0;
    public static final String COMMAND_PARSER_KEY = "auditreplay.command-parser.class";
    public static final Class<AuditLogDirectParser> COMMAND_PARSER_DEFAULT = AuditLogDirectParser.class;
    private static final Logger LOG = LoggerFactory.getLogger(AuditReplayMapper.class);
    private static final long MAX_READAHEAD_MS = 60000L;
    public static final String INDIVIDUAL_COMMANDS_COUNTER_GROUP = "INDIVIDUAL_COMMANDS";
    public static final String INDIVIDUAL_COMMANDS_LATENCY_SUFFIX = "_LATENCY";
    public static final String INDIVIDUAL_COMMANDS_INVALID_SUFFIX = "_INVALID";
    public static final String INDIVIDUAL_COMMANDS_COUNT_SUFFIX = "_COUNT";
    private long startTimestampMs;
    private int numThreads;
    private double rateFactor;
    private long highestTimestamp;
    private Long highestSequence;
    private List<AuditReplayThread> threads;
    private DelayQueue<AuditReplayCommand> commandQueue;
    private Function<Long, Long> relativeToAbsoluteTimestamp;
    private AuditCommandParser commandParser;
    private ScheduledThreadPoolExecutor progressExecutor;

    @Override
    public String getDescription() {
        return "This mapper replays audit log files.";
    }

    @Override
    public List<String> getConfigDescriptions() {
        return Lists.newArrayList((Object[])new String[]{"auditreplay.input-path (required): Path to directory containing input files.", "auditreplay.output-path (required): Path to destination for output files.", "auditreplay.num-threads (default 1): Number of threads to use per mapper for replay.", "auditreplay.create-blocks (default true): Whether or not to create 1-byte blocks when performing `create` commands.", "auditreplay.rate-factor (default 1.0): Multiplicative speed at which to replay the audit log; e.g. a value of 2.0 would make the replay occur at twice the original speed. This can be useful to induce heavier loads."});
    }

    @Override
    public boolean verifyConfigurations(Configuration conf) {
        return conf.get(INPUT_PATH_KEY) != null && conf.get(OUTPUT_PATH_KEY) != null;
    }

    public void setup(Mapper.Context context) throws IOException {
        Configuration conf = context.getConfiguration();
        this.startTimestampMs = conf.getLong("start_timestamp_ms", -1L);
        this.numThreads = conf.getInt(NUM_THREADS_KEY, 1);
        this.rateFactor = conf.getDouble(RATE_FACTOR_KEY, 1.0);
        try {
            this.commandParser = (AuditCommandParser)conf.getClass(COMMAND_PARSER_KEY, COMMAND_PARSER_DEFAULT, AuditCommandParser.class).getConstructor(new Class[0]).newInstance(new Object[0]);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new IOException("Exception encountered while instantiating the command parser", e);
        }
        this.commandParser.initialize(conf);
        this.relativeToAbsoluteTimestamp = input -> this.startTimestampMs + Math.round((double)input.longValue() / this.rateFactor);
        LOG.info("Starting " + this.numThreads + " threads");
        this.progressExecutor = new ScheduledThreadPoolExecutor(1);
        long progressFrequencyMs = conf.getLong("mapreduce.task.timeout", 120000L) / 2L;
        this.progressExecutor.scheduleAtFixedRate(() -> ((Mapper.Context)context).progress(), progressFrequencyMs, progressFrequencyMs, TimeUnit.MILLISECONDS);
        this.threads = new ArrayList<AuditReplayThread>();
        ConcurrentHashMap<String, FileSystem> fsCache = new ConcurrentHashMap<String, FileSystem>();
        this.commandQueue = new DelayQueue();
        for (int i = 0; i < this.numThreads; ++i) {
            AuditReplayThread thread = new AuditReplayThread(context, this.commandQueue, fsCache);
            this.threads.add(thread);
            thread.start();
        }
    }

    public void map(LongWritable lineNum, Text inputLine, Mapper.Context context) throws IOException, InterruptedException {
        AuditReplayCommand cmd = this.commandParser.parse(lineNum.get(), inputLine, this.relativeToAbsoluteTimestamp);
        long delay = cmd.getDelay(TimeUnit.MILLISECONDS);
        if (delay > 60000L) {
            Thread.sleep(delay - 30000L);
        }
        this.commandQueue.put(cmd);
        this.highestTimestamp = cmd.getAbsoluteTimestamp();
        this.highestSequence = cmd.getSequence();
    }

    public void cleanup(Mapper.Context context) throws InterruptedException, IOException {
        for (AuditReplayThread auditReplayThread : this.threads) {
            auditReplayThread.addToQueue(AuditReplayCommand.getPoisonPill(this.highestTimestamp + 1L));
        }
        Optional<Object> threadException = Optional.empty();
        for (AuditReplayThread t : this.threads) {
            t.join();
            t.drainCounters(context);
            t.drainCommandLatencies(context);
            if (t.getException() == null) continue;
            threadException = Optional.of(t.getException());
        }
        this.progressExecutor.shutdown();
        if (threadException.isPresent()) {
            throw new RuntimeException("Exception in AuditReplayThread", (Throwable)threadException.get());
        }
        LOG.info("Time taken to replay the logs in ms: " + (System.currentTimeMillis() - this.startTimestampMs));
        long l = context.getCounter((Enum)REPLAYCOUNTERS.TOTALCOMMANDS).getValue();
        if (l != 0L) {
            double percentageOfInvalidOps = (double)context.getCounter((Enum)REPLAYCOUNTERS.TOTALINVALIDCOMMANDS).getValue() * 100.0 / (double)l;
            LOG.info("Percentage of invalid ops: " + percentageOfInvalidOps);
        }
    }

    @Override
    public void configureJob(Job job) {
        job.setMapOutputKeyClass(UserCommandKey.class);
        job.setMapOutputValueClass(CountTimeWritable.class);
        job.setInputFormatClass(NoSplitTextInputFormat.class);
        job.setNumReduceTasks(1);
        job.setReducerClass(AuditReplayReducer.class);
        job.setOutputKeyClass(UserCommandKey.class);
        job.setOutputValueClass(CountTimeWritable.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath((Job)job, (Path)new Path(job.getConfiguration().get(OUTPUT_PATH_KEY)));
        job.getConfiguration().set(TextOutputFormat.SEPARATOR, ",");
    }

    public static enum REPLAYCOUNTERS {
        TOTALCOMMANDS,
        TOTALINVALIDCOMMANDS,
        TOTALUNSUPPORTEDCOMMANDS,
        LATECOMMANDS,
        LATECOMMANDSTOTALTIME,
        TOTALWRITECOMMANDS,
        TOTALWRITECOMMANDLATENCY,
        TOTALREADCOMMANDS,
        TOTALREADCOMMANDLATENCY;

    }

    public static enum CommandType {
        READ,
        WRITE;

    }

    public static enum ReplayCommand {
        APPEND(CommandType.WRITE),
        CREATE(CommandType.WRITE),
        GETFILEINFO(CommandType.READ),
        CONTENTSUMMARY(CommandType.READ),
        MKDIRS(CommandType.WRITE),
        RENAME(CommandType.WRITE),
        LISTSTATUS(CommandType.READ),
        DELETE(CommandType.WRITE),
        OPEN(CommandType.READ),
        SETPERMISSION(CommandType.WRITE),
        SETOWNER(CommandType.WRITE),
        SETTIMES(CommandType.WRITE),
        SETREPLICATION(CommandType.WRITE),
        CONCAT(CommandType.WRITE);

        private final CommandType type;

        private ReplayCommand(CommandType type) {
            this.type = type;
        }

        public CommandType getType() {
            return this.type;
        }
    }
}

