/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapreduce;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobResourceUploader;
import org.apache.hadoop.mapreduce.SharedCacheConfig;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.client.api.SharedCacheClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestJobResourceUploaderWithSharedCache {
    protected static final Logger LOG = LoggerFactory.getLogger(TestJobResourceUploaderWithSharedCache.class);
    private static MiniDFSCluster dfs;
    private static FileSystem localFs;
    private static FileSystem remoteFs;
    private static Configuration conf;
    private static Path testRootDir;
    private static Path remoteStagingDir;
    private String input = "roses.are.red\nviolets.are.blue\nbunnies.are.pink\n";

    @BeforeEach
    public void cleanup() throws Exception {
        remoteFs.delete(remoteStagingDir, true);
    }

    @BeforeAll
    public static void setup() throws IOException {
        localFs = FileSystem.getLocal((Configuration)conf);
        testRootDir = new Path("target", TestJobResourceUploaderWithSharedCache.class.getName() + "-tmpDir").makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
        dfs = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
        remoteFs = dfs.getFileSystem();
    }

    @AfterAll
    public static void tearDown() {
        try {
            if (localFs != null) {
                localFs.close();
            }
            if (remoteFs != null) {
                remoteFs.close();
            }
            if (dfs != null) {
                dfs.shutdown();
            }
        }
        catch (IOException ioe) {
            LOG.info("IO exception in closing file system");
            ioe.printStackTrace();
        }
    }

    @Test
    public void testSharedCacheDisabled() throws Exception {
        JobConf jobConf = this.createJobConf();
        Job job = new Job(jobConf);
        job.setJobID(new JobID("567789", 1));
        this.uploadFilesToRemoteFS(job, jobConf, 0, 0, 0, false);
    }

    @Test
    public void testSharedCacheEnabled() throws Exception {
        JobConf jobConf = this.createJobConf();
        jobConf.set("mapreduce.job.sharedcache.mode", "enabled");
        Job job = new Job(jobConf);
        job.setJobID(new JobID("567789", 1));
        this.uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, false);
    }

    @Test
    public void testSharedCacheEnabledWithJobJarInSharedCache() throws Exception {
        JobConf jobConf = this.createJobConf();
        jobConf.set("mapreduce.job.sharedcache.mode", "enabled");
        Job job = new Job(jobConf);
        job.setJobID(new JobID("567789", 1));
        this.uploadFilesToRemoteFS(job, jobConf, 8, 3, 2, true);
    }

    @Test
    public void testSharedCacheArchivesAndLibjarsEnabled() throws Exception {
        JobConf jobConf = this.createJobConf();
        jobConf.set("mapreduce.job.sharedcache.mode", "archives,libjars");
        Job job = new Job(jobConf);
        job.setJobID(new JobID("567789", 1));
        this.uploadFilesToRemoteFS(job, jobConf, 5, 1, 2, true);
    }

    private JobConf createJobConf() {
        JobConf jobConf = new JobConf();
        jobConf.set("mapreduce.framework.name", "yarn");
        jobConf.setBoolean("yarn.sharedcache.enabled", true);
        jobConf.set("fs.defaultFS", remoteFs.getUri().toString());
        return jobConf;
    }

    private Path copyToRemote(Path jar) throws IOException {
        Path remoteFile = new Path("/tmp", jar.getName());
        remoteFs.copyFromLocalFile(jar, remoteFile);
        return remoteFile;
    }

    private void makeJarAvailableInSharedCache(Path jar, MyFileUploader fileUploader) throws YarnException, IOException {
        Path remoteFile = this.copyToRemote(jar);
        fileUploader.mockFileInSharedCache(jar, URL.fromPath((Path)remoteFile));
    }

    private void uploadFilesToRemoteFS(Job job, JobConf jobConf, int useCallCountExpected, int numOfFilesShouldBeUploadedToSharedCacheExpected, int numOfArchivesShouldBeUploadedToSharedCacheExpected, boolean jobJarInSharedCacheBeforeUpload) throws Exception {
        MyFileUploader fileUploader = new MyFileUploader(remoteFs, (Configuration)jobConf);
        SharedCacheConfig sharedCacheConfig = new SharedCacheConfig();
        sharedCacheConfig.init((Configuration)jobConf);
        Path firstFile = this.createTempFile("first-input-file", "x");
        Path secondFile = this.createTempFile("second-input-file", "xx");
        boolean fileAdded = Job.addFileToSharedCache((URI)firstFile.toUri(), (Configuration)jobConf);
        Assertions.assertEquals((Object)sharedCacheConfig.isSharedCacheFilesEnabled(), (Object)fileAdded);
        if (!fileAdded) {
            Path remoteFile = this.copyToRemote(firstFile);
            job.addCacheFile(remoteFile.toUri());
        }
        jobConf.set("tmpfiles", secondFile.toString());
        Path firstJar = this.makeJar(new Path(testRootDir, "distributed.first.jar"), 1);
        Path secondJar = this.makeJar(new Path(testRootDir, "distributed.second.jar"), 2);
        Path thirdJar = new Path(testRootDir, "distributed.third.jar");
        localFs.copyFromLocalFile(secondJar, thirdJar);
        this.makeJarAvailableInSharedCache(secondJar, fileUploader);
        boolean libjarAdded = Job.addFileToSharedCacheAndClasspath((URI)firstJar.toUri(), (Configuration)jobConf);
        Assertions.assertEquals((Object)sharedCacheConfig.isSharedCacheLibjarsEnabled(), (Object)libjarAdded);
        if (!libjarAdded) {
            Path remoteJar = this.copyToRemote(firstJar);
            job.addFileToClassPath(remoteJar);
        }
        jobConf.set("tmpjars", secondJar.toString() + "," + thirdJar.toString());
        Path firstArchive = this.makeArchive("first-archive.zip", "first-file");
        Path secondArchive = this.makeArchive("second-archive.zip", "second-file");
        boolean archiveAdded = Job.addArchiveToSharedCache((URI)firstArchive.toUri(), (Configuration)jobConf);
        Assertions.assertEquals((Object)sharedCacheConfig.isSharedCacheArchivesEnabled(), (Object)archiveAdded);
        if (!archiveAdded) {
            Path remoteArchive = this.copyToRemote(firstArchive);
            job.addCacheArchive(remoteArchive.toUri());
        }
        jobConf.set("tmparchives", secondArchive.toString());
        Path jobJar = this.makeJar(new Path(testRootDir, "test-job.jar"), 4);
        if (jobJarInSharedCacheBeforeUpload) {
            this.makeJarAvailableInSharedCache(jobJar, fileUploader);
        }
        jobConf.setJar(jobJar.toString());
        fileUploader.uploadResources(job, remoteStagingDir);
        ((SharedCacheClient)Mockito.verify((Object)fileUploader.mockscClient, (VerificationMode)Mockito.times((int)useCallCountExpected))).use((ApplicationId)ArgumentMatchers.any(ApplicationId.class), ArgumentMatchers.anyString());
        int numOfFilesShouldBeUploadedToSharedCache = 0;
        Map filesSharedCacheUploadPolicies = Job.getFileSharedCacheUploadPolicies((Configuration)jobConf);
        for (Boolean policy : filesSharedCacheUploadPolicies.values()) {
            if (!policy.booleanValue()) continue;
            ++numOfFilesShouldBeUploadedToSharedCache;
        }
        Assertions.assertEquals((int)numOfFilesShouldBeUploadedToSharedCacheExpected, (int)numOfFilesShouldBeUploadedToSharedCache);
        int numOfArchivesShouldBeUploadedToSharedCache = 0;
        Map archivesSharedCacheUploadPolicies = Job.getArchiveSharedCacheUploadPolicies((Configuration)jobConf);
        for (Boolean policy : archivesSharedCacheUploadPolicies.values()) {
            if (!policy.booleanValue()) continue;
            ++numOfArchivesShouldBeUploadedToSharedCache;
        }
        Assertions.assertEquals((int)numOfArchivesShouldBeUploadedToSharedCacheExpected, (int)numOfArchivesShouldBeUploadedToSharedCache);
    }

    private Path createTempFile(String filename, String contents) throws IOException {
        Path path = new Path(testRootDir, filename);
        FSDataOutputStream os = localFs.create(path);
        os.writeBytes(contents);
        os.close();
        localFs.setPermission(path, new FsPermission("700"));
        return path;
    }

    private Path makeJar(Path p, int index) throws FileNotFoundException, IOException {
        FileOutputStream fos = new FileOutputStream(new File(p.toUri().getPath()));
        try (JarOutputStream jos = new JarOutputStream(fos);){
            ZipEntry ze = new ZipEntry("distributed.jar.inside" + index);
            jos.putNextEntry(ze);
            jos.write(("inside the jar!" + index).getBytes());
            jos.closeEntry();
        }
        localFs.setPermission(p, new FsPermission("700"));
        return p;
    }

    private Path makeArchive(String archiveFile, String filename) throws Exception {
        Path archive = new Path(testRootDir, archiveFile);
        Path file = new Path(testRootDir, filename);
        FSDataOutputStream out = localFs.create(archive);
        try (ZipOutputStream zos = new ZipOutputStream((OutputStream)out);){
            ZipEntry ze = new ZipEntry(file.toString());
            zos.putNextEntry(ze);
            zos.write(this.input.getBytes(StandardCharsets.UTF_8));
            zos.closeEntry();
        }
        return archive;
    }

    static {
        conf = new Configuration();
        remoteStagingDir = new Path("/tmp/hadoop-yarn/staging");
    }

    private class MyFileUploader
    extends JobResourceUploader {
        private SharedCacheClient mockscClient;
        private SharedCacheClient scClient;

        MyFileUploader(FileSystem submitFs, Configuration conf) throws IOException {
            super(submitFs, false);
            this.mockscClient = (SharedCacheClient)Mockito.mock(SharedCacheClient.class);
            this.scClient = SharedCacheClient.createSharedCacheClient();
            this.scClient.init(conf);
            Mockito.when((Object)this.mockscClient.getFileChecksum((Path)ArgumentMatchers.any(Path.class))).thenAnswer((Answer)new Answer<String>(){

                public String answer(InvocationOnMock invocation) throws Throwable {
                    Path file = (Path)invocation.getArguments()[0];
                    return MyFileUploader.this.scClient.getFileChecksum(file);
                }
            });
        }

        public void mockFileInSharedCache(Path localFile, URL remoteFile) throws YarnException, IOException {
            Mockito.when((Object)this.mockscClient.use((ApplicationId)ArgumentMatchers.any(ApplicationId.class), (String)ArgumentMatchers.eq((Object)this.scClient.getFileChecksum(localFile)))).thenReturn((Object)remoteFile);
        }

        protected SharedCacheClient createSharedCacheClient(Configuration c) {
            return this.mockscClient;
        }
    }
}

