/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MiniMRClientCluster;
import org.apache.hadoop.mapred.MiniMRClientClusterFactory;
import org.apache.hadoop.mapred.Utils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RandomTextWriter;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.mapreduce.security.IntermediateEncryptedStream;
import org.apache.hadoop.mapreduce.security.SpillCallBackInjector;
import org.apache.hadoop.mapreduce.security.SpillCallBackPathsFinder;
import org.apache.hadoop.mapreduce.util.MRJobConfUtil;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMRIntermediateDataEncryption {
    public static final Logger LOG = LoggerFactory.getLogger(TestMRIntermediateDataEncryption.class);
    public static final long TOTAL_MBS_DEFAULT = 128L;
    public static final long BLOCK_SIZE_DEFAULT = 0x2000000L;
    public static final int INPUT_GEN_NUM_THREADS = 16;
    public static final long TASK_SORT_IO_MB_DEFAULT = 128L;
    public static final String JOB_DIR_PATH = "jobs-data-path";
    private static File testRootDir;
    private static volatile BufferedWriter inputBufferedWriter;
    private static Configuration commonConfig;
    private static MiniDFSCluster dfsCluster;
    private static MiniMRClientCluster mrCluster;
    private static FileSystem fs;
    private static FileChecksum checkSumReference;
    private static Path jobInputDirPath;
    private static long inputFileSize;
    private String testTitleName;
    private int numMappers;
    private int numReducers;
    private boolean isUber;
    private Configuration config;
    private Path jobOutputPath;

    public void initTestMRIntermediateDataEncryption(String pTestName, int pMappers, int pReducers, boolean pUberEnabled) throws Exception {
        this.testTitleName = pTestName;
        this.numMappers = pMappers;
        this.numReducers = pReducers;
        this.isUber = pUberEnabled;
        this.setup();
    }

    public static Collection<Object[]> getTestParameters() {
        return Arrays.asList({"testSingleReducer", 3, 1, false}, {"testUberMode", 3, 1, true}, {"testMultipleMapsPerNode", 8, 1, false}, {"testMultipleReducers", 2, 4, false});
    }

    @BeforeAll
    public static void setupClass() throws Exception {
        testRootDir = GenericTestUtils.setupTestRootDir(TestMRIntermediateDataEncryption.class);
        File dfsFolder = new File(testRootDir, "dfs");
        Path jobsDirPath = new Path(JOB_DIR_PATH);
        commonConfig = TestMRIntermediateDataEncryption.createBaseConfiguration();
        dfsCluster = new MiniDFSCluster.Builder(commonConfig, dfsFolder).numDataNodes(2).build();
        dfsCluster.waitActive();
        mrCluster = MiniMRClientClusterFactory.create(TestMRIntermediateDataEncryption.class, 2, commonConfig);
        mrCluster.start();
        fs = dfsCluster.getFileSystem();
        if (fs.exists(jobsDirPath) && !fs.delete(jobsDirPath, true)) {
            throw new IOException("Could not delete JobsDirPath" + jobsDirPath);
        }
        fs.mkdirs(jobsDirPath);
        jobInputDirPath = new Path(jobsDirPath, "in-dir");
        Assertions.assertEquals((int)0, (int)TestMRIntermediateDataEncryption.generateInputTextFile(), (String)"Generating input should succeed");
        TestMRIntermediateDataEncryption.runReferenceJob();
    }

    @AfterAll
    public static void tearDown() throws IOException {
        File textInputFile;
        if (mrCluster != null) {
            mrCluster.stop();
        }
        if (dfsCluster != null) {
            dfsCluster.shutdown();
        }
        if ((textInputFile = new File(testRootDir, "input.txt")).exists()) {
            Assertions.assertTrue((boolean)textInputFile.delete());
        }
    }

    private static Configuration createBaseConfiguration() {
        Configuration conf = MRJobConfUtil.initEncryptedIntermediateConfigsForTesting(null);
        conf = MRJobConfUtil.setLocalDirectoriesConfigForTesting((Configuration)conf, (File)testRootDir);
        conf.setLong("dfs.blocksize", 0x2000000L);
        return conf;
    }

    private static synchronized BufferedWriter getTextInputWriter() throws IOException {
        if (inputBufferedWriter == null) {
            File textInputFile = new File(testRootDir, "input.txt");
            inputBufferedWriter = new BufferedWriter(new FileWriter(textInputFile));
        }
        return inputBufferedWriter;
    }

    private static int generateInputTextFile() throws Exception {
        File textInputFile = new File(testRootDir, "input.txt");
        AtomicLong actualWrittenBytes = new AtomicLong(0L);
        ExecutorService executor = Executors.newFixedThreadPool(16);
        ArrayList<Future> inputGenerators = new ArrayList<Future>();
        InputGeneratorTask callableGen = new InputGeneratorTask();
        long startTime = Time.monotonicNow();
        for (int i = 0; i < 16; ++i) {
            Future genFutureTask = executor.submit(callableGen);
            inputGenerators.add(genFutureTask);
        }
        for (Future genFutureTask : inputGenerators) {
            LOG.info("Received one task. Current total bytes: {}", (Object)actualWrittenBytes.addAndGet((Long)genFutureTask.get()));
        }
        TestMRIntermediateDataEncryption.getTextInputWriter().close();
        long endTime = Time.monotonicNow();
        LOG.info("Finished generating input. Wrote {} bytes in {} seconds", (Object)actualWrittenBytes.get(), (Object)((double)(endTime - startTime) * 1.0 / 1000.0));
        executor.shutdown();
        fs.mkdirs(jobInputDirPath);
        Path textInputPath = fs.makeQualified(new Path(jobInputDirPath, "input.txt"));
        fs.copyFromLocalFile(true, new Path(textInputFile.getAbsolutePath()), textInputPath);
        if (!fs.exists(textInputPath)) {
            return 1;
        }
        FileStatus[] fileStatus = fs.listStatus(textInputPath);
        inputFileSize = fileStatus[0].getLen();
        LOG.info("Text input file; path: {}, size: {}", (Object)textInputPath, (Object)inputFileSize);
        return 0;
    }

    private static void runReferenceJob() throws Exception {
        String jobRefLabel = "job-reference";
        Path jobRefDirPath = new Path(JOB_DIR_PATH, "job-reference");
        if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
            throw new IOException("Could not delete " + jobRefDirPath);
        }
        Assertions.assertTrue((boolean)fs.mkdirs(jobRefDirPath));
        Path jobRefOutputPath = new Path(jobRefDirPath, "out-dir");
        Configuration referenceConf = new Configuration(commonConfig);
        referenceConf.setBoolean("mapreduce.job.encrypted-intermediate-data", false);
        Job jobReference = TestMRIntermediateDataEncryption.runWordCountJob("job-reference", jobRefOutputPath, referenceConf, 4, 1);
        Assertions.assertTrue((boolean)jobReference.isSuccessful());
        FileStatus[] fileStatusArr = fs.listStatus(jobRefOutputPath, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter());
        Assertions.assertEquals((int)1, (int)fileStatusArr.length);
        checkSumReference = fs.getFileChecksum(fileStatusArr[0].getPath());
        Assertions.assertTrue((boolean)fs.delete(jobRefDirPath, true));
    }

    private static Job runWordCountJob(String postfixName, Path jOutputPath, Configuration jConf, int mappers, int reducers) throws Exception {
        Job job = Job.getInstance((Configuration)jConf);
        job.getConfiguration().setInt("mapreduce.job.maps", mappers);
        job.setJarByClass(TestMRIntermediateDataEncryption.class);
        job.setJobName("mr-spill-" + postfixName);
        job.setMapperClass(TokenizerMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setCombinerClass(LongSumReducer.class);
        FileInputFormat.setMinInputSplitSize((Job)job, (long)((inputFileSize + (long)mappers) / (long)mappers));
        job.setReducerClass(LongSumReducer.class);
        job.setNumReduceTasks(reducers);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);
        FileInputFormat.addInputPath((Job)job, (Path)jobInputDirPath);
        FileOutputFormat.setOutputPath((Job)job, (Path)jOutputPath);
        if (job.waitForCompletion(true)) {
            FileStatus[] fileStatusArr;
            for (FileStatus fStatus : fileStatusArr = fs.listStatus(jOutputPath, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter())) {
                LOG.info("Job: {} .. Output file {} .. Size = {}", new Object[]{postfixName, fStatus.getPath(), fStatus.getLen()});
            }
        }
        return job;
    }

    private boolean validateJobOutput() throws Exception {
        Assertions.assertTrue((boolean)fs.exists(this.jobOutputPath), (String)("Job Output path [" + this.jobOutputPath + "] should exist"));
        Path outputPath = this.jobOutputPath;
        if (this.numReducers != 1) {
            String jobRefLabel = this.testTitleName + "-combine";
            Path jobRefDirPath = new Path(JOB_DIR_PATH, jobRefLabel);
            if (fs.exists(jobRefDirPath) && !fs.delete(jobRefDirPath, true)) {
                throw new IOException("Could not delete " + jobRefDirPath);
            }
            fs.mkdirs(jobRefDirPath);
            outputPath = new Path(jobRefDirPath, "out-dir");
            Configuration referenceConf = new Configuration(commonConfig);
            referenceConf.setBoolean("mapreduce.job.encrypted-intermediate-data", false);
            Job combinerJob = Job.getInstance((Configuration)referenceConf);
            combinerJob.setJarByClass(TestMRIntermediateDataEncryption.class);
            combinerJob.setJobName("mr-spill-" + jobRefLabel);
            combinerJob.setMapperClass(CombinerJobMapper.class);
            FileInputFormat.addInputPath((Job)combinerJob, (Path)this.jobOutputPath);
            combinerJob.setReducerClass(LongSumReducer.class);
            combinerJob.setNumReduceTasks(1);
            combinerJob.setOutputKeyClass(Text.class);
            combinerJob.setOutputValueClass(LongWritable.class);
            FileOutputFormat.setOutputPath((Job)combinerJob, (Path)outputPath);
            if (!combinerJob.waitForCompletion(true)) {
                return false;
            }
            FileStatus[] fileStatusArr = fs.listStatus(outputPath, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter());
            LOG.info("Job-Combination: {} .. Output file {} .. Size = {}", new Object[]{jobRefDirPath, fileStatusArr[0].getPath(), fileStatusArr[0].getLen()});
        }
        FileStatus[] fileStatusArr = fs.listStatus(outputPath, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter());
        FileChecksum jobFileChecksum = fs.getFileChecksum(fileStatusArr[0].getPath());
        return checkSumReference.equals((Object)jobFileChecksum);
    }

    public void setup() throws Exception {
        LOG.info("Starting TestMRIntermediateDataEncryption#{}.......", (Object)this.testTitleName);
        Path jobDirPath = new Path(JOB_DIR_PATH, this.testTitleName);
        if (fs.exists(jobDirPath) && !fs.delete(jobDirPath, true)) {
            throw new IOException("Could not delete " + jobDirPath);
        }
        fs.mkdirs(jobDirPath);
        this.jobOutputPath = new Path(jobDirPath, "out-dir");
        this.config = new Configuration(commonConfig);
        this.config.setBoolean("mapreduce.job.ubertask.enable", this.isUber);
        this.config.setFloat("mapreduce.job.reduce.slowstart.completedmaps", 1.0f);
        long ioSortMb = 128L;
        this.config.setLong("mapreduce.task.io.sort.mb", ioSortMb);
        long mapMb = Math.max(2L * ioSortMb, (long)this.config.getInt("mapreduce.map.memory.mb", 1024));
        this.config.setLong("mapreduce.map.memory.mb", mapMb);
        this.config.set("mapreduce.map.java.opts", "-Xmx" + (mapMb - 200L) + "m");
        this.config.setInt("mapreduce.job.maps", this.numMappers);
        this.config.setInt("mapreduce.map.maxattempts", 1);
        this.config.setInt("mapreduce.reduce.maxattempts", 1);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest(name="{index}: TestMRIntermediateDataEncryption.{0} .. mappers:{1}, reducers:{2}, isUber:{3})")
    @MethodSource(value={"getTestParameters"})
    public void testWordCount(String pTestName, int pMappers, int pReducers, boolean pUberEnabled) throws Exception {
        this.initTestMRIntermediateDataEncryption(pTestName, pMappers, pReducers, pUberEnabled);
        LOG.info("........Starting main Job Driver #{} starting at {}.......", (Object)this.testTitleName, (Object)Time.formatTime((long)System.currentTimeMillis()));
        SpillCallBackPathsFinder spillInjector = (SpillCallBackPathsFinder)IntermediateEncryptedStream.setSpillCBInjector((SpillCallBackInjector)new SpillCallBackPathsFinder());
        StringBuilder testSummary = new StringBuilder(String.format("%n ===== test %s summary ======", this.testTitleName));
        try {
            FileStatus[] fileStatusArr;
            long startTime = Time.monotonicNow();
            testSummary.append(String.format("%nJob %s started at %s", this.testTitleName, Time.formatTime((long)System.currentTimeMillis())));
            Job job = TestMRIntermediateDataEncryption.runWordCountJob(this.testTitleName, this.jobOutputPath, this.config, this.numMappers, this.numReducers);
            Assertions.assertTrue((boolean)job.isSuccessful());
            long endTime = Time.monotonicNow();
            testSummary.append(String.format("%nJob %s ended at %s", job.getJobName(), Time.formatTime((long)System.currentTimeMillis())));
            testSummary.append(String.format("%n\tThe job took %.3f seconds", 1.0 * (double)(endTime - startTime) / 1000.0));
            for (FileStatus fStatus : fileStatusArr = fs.listStatus(this.jobOutputPath, (PathFilter)new Utils.OutputFileUtils.OutputFilesFilter())) {
                long fileSize = fStatus.getLen();
                testSummary.append(String.format("%n\tOutput file %s: %d", fStatus.getPath(), fileSize));
            }
            Assertions.assertTrue((boolean)this.validateJobOutput());
            long spilledRecords = job.getCounters().findCounter((Enum)TaskCounter.SPILLED_RECORDS).getValue();
            Assertions.assertTrue((spilledRecords > 0L ? 1 : 0) != 0, (String)"Spill records must be greater than 0");
            Assertions.assertFalse((boolean)spillInjector.getEncryptedSpilledFiles().isEmpty(), (String)"The encrypted spilled files should not be empty.");
            Assertions.assertTrue((boolean)spillInjector.getInvalidSpillEntries().isEmpty(), (String)"Invalid access to spill file positions");
        }
        finally {
            testSummary.append(spillInjector.getSpilledFileReport());
            LOG.info(testSummary.toString());
            IntermediateEncryptedStream.resetSpillCBInjector();
        }
    }

    public static class CombinerJobMapper
    extends Mapper<Object, Text, Text, LongWritable> {
        private final LongWritable sum = new LongWritable(0L);
        private final Text word = new Text();

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            String[] line = value.toString().split("\\s+");
            this.sum.set(Long.parseLong(line[1]));
            this.word.set(line[0]);
            context.write((Object)this.word, (Object)this.sum);
        }
    }

    public static class TokenizerMapper
    extends Mapper<Object, Text, Text, LongWritable> {
        private static final LongWritable ONE = new LongWritable(1L);
        private final Text word = new Text();

        public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write((Object)this.word, (Object)ONE);
            }
        }
    }

    static class InputGeneratorTask
    implements Callable<Long> {
        InputGeneratorTask() {
        }

        @Override
        public Long call() throws Exception {
            long bytesWritten;
            String sentence;
            ThreadLocalRandom rand = ThreadLocalRandom.current();
            long totalBytes = 0x8000000L;
            long bytesPerTask = 0x800000L;
            String newLine = System.lineSeparator();
            BufferedWriter writer = TestMRIntermediateDataEncryption.getTextInputWriter();
            for (bytesWritten = 0L; bytesWritten < 0x800000L; bytesWritten += (long)sentence.length()) {
                sentence = RandomTextWriter.generateSentenceWithRand(rand, rand.nextInt(5, 20)).concat(newLine);
                writer.write(sentence);
            }
            writer.flush();
            LOG.info("Task {} finished. Wrote {} bytes.", (Object)Thread.currentThread().getName(), (Object)bytesWritten);
            return bytesWritten;
        }
    }
}

