/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.hadoop;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.TestMRHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class TestMRInputHelpers {
    protected static MiniDFSCluster dfsCluster;
    private static Configuration conf;
    private static FileSystem remoteFs;
    private static LocalFileSystem localFs;
    private static Path testFilePath;
    private static Path oldSplitsDir;
    private static Path newSplitsDir;
    private static Path testRootDir;
    private static Path localTestRootDir;

    @BeforeClass
    public static void setup() throws IOException {
        testRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName(), new FileAttribute[0]).toString());
        localTestRootDir = new Path(Files.createTempDirectory(TestMRHelpers.class.getName() + "-local", new FileAttribute[0]).toString());
        try {
            conf.set("hdfs.minidfs.basedir", testRootDir.toString());
            dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).format(true).racks(null).build();
            remoteFs = dfsCluster.getFileSystem();
        }
        catch (IOException io) {
            throw new RuntimeException("problem starting mini dfs cluster", io);
        }
        Configuration testConf = new Configuration(dfsCluster.getFileSystem().getConf());
        try (FSDataOutputStream dataOutputStream = null;){
            dataOutputStream = remoteFs.create(new Path("/tmp/input/test.xml"), true);
            testConf.writeXml((OutputStream)dataOutputStream);
            dataOutputStream.hsync();
        }
        remoteFs.mkdirs(new Path("/tmp/input/"));
        remoteFs.mkdirs(new Path("/tmp/splitsDirNew/"));
        remoteFs.mkdirs(new Path("/tmp/splitsDirOld/"));
        testFilePath = remoteFs.makeQualified(new Path("/tmp/input/test.xml"));
        FileStatus fsStatus = remoteFs.getFileStatus(testFilePath);
        Assert.assertTrue((fsStatus.getLen() > 0L ? 1 : 0) != 0);
        oldSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirOld/"));
        newSplitsDir = remoteFs.makeQualified(new Path("/tmp/splitsDirNew/"));
        localFs = FileSystem.getLocal((Configuration)conf);
    }

    @Test(timeout=5000L)
    public void testNewSplitsGen() throws Exception {
        DataSourceDescriptor dataSource = this.generateDataSourceDescriptorMapReduce(newSplitsDir);
        Assert.assertTrue((boolean)dataSource.getAdditionalLocalFiles().containsKey("job.split"));
        Assert.assertTrue((boolean)dataSource.getAdditionalLocalFiles().containsKey("job.splitmetainfo"));
        RemoteIterator files = remoteFs.listFiles(newSplitsDir, false);
        boolean foundSplitsFile = false;
        boolean foundMetaFile = false;
        int totalFilesFound = 0;
        while (files.hasNext()) {
            LocatedFileStatus status = (LocatedFileStatus)files.next();
            String fName = status.getPath().getName();
            ++totalFilesFound;
            if (fName.equals("job.split")) {
                foundSplitsFile = true;
            } else if (fName.equals("job.splitmetainfo")) {
                foundMetaFile = true;
            } else {
                Assert.fail((String)("Found invalid file in splits dir, filename=" + fName));
            }
            Assert.assertTrue((status.getLen() > 0L ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)2L, (long)totalFilesFound);
        Assert.assertTrue((boolean)foundSplitsFile);
        Assert.assertTrue((boolean)foundMetaFile);
        this.verifyLocationHints(newSplitsDir, dataSource.getLocationHint().getTaskLocationHints());
    }

    @Test(timeout=5000L)
    public void testOldSplitsGen() throws Exception {
        DataSourceDescriptor dataSource = this.generateDataSourceDescriptorMapRed(oldSplitsDir);
        Assert.assertTrue((boolean)dataSource.getAdditionalLocalFiles().containsKey("job.split"));
        Assert.assertTrue((boolean)dataSource.getAdditionalLocalFiles().containsKey("job.splitmetainfo"));
        RemoteIterator files = remoteFs.listFiles(oldSplitsDir, false);
        boolean foundSplitsFile = false;
        boolean foundMetaFile = false;
        int totalFilesFound = 0;
        while (files.hasNext()) {
            LocatedFileStatus status = (LocatedFileStatus)files.next();
            String fName = status.getPath().getName();
            ++totalFilesFound;
            if (fName.equals("job.split")) {
                foundSplitsFile = true;
            } else if (fName.equals("job.splitmetainfo")) {
                foundMetaFile = true;
            } else {
                Assert.fail((String)("Found invalid file in splits dir, filename=" + fName));
            }
            Assert.assertTrue((status.getLen() > 0L ? 1 : 0) != 0);
        }
        Assert.assertEquals((long)2L, (long)totalFilesFound);
        Assert.assertTrue((boolean)foundSplitsFile);
        Assert.assertTrue((boolean)foundMetaFile);
        this.verifyLocationHints(oldSplitsDir, dataSource.getLocationHint().getTaskLocationHints());
    }

    @Test(timeout=5000L)
    public void testInputSplitLocalResourceCreation() throws Exception {
        DataSourceDescriptor dataSource = this.generateDataSourceDescriptorMapRed(oldSplitsDir);
        Map localResources = dataSource.getAdditionalLocalFiles();
        Assert.assertEquals((long)2L, (long)localResources.size());
        Assert.assertTrue((boolean)localResources.containsKey("job.split"));
        Assert.assertTrue((boolean)localResources.containsKey("job.splitmetainfo"));
    }

    @Test
    public void testInputEventSerializedPayload() throws IOException {
        MRRuntimeProtos.MRSplitProto proto = MRRuntimeProtos.MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom((byte[])"splits".getBytes())).build();
        InputDataInformationEvent initEvent = InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)proto.toByteString().asReadOnlyByteBuffer());
        MRRuntimeProtos.MRSplitProto protoFromEvent = MRInputHelpers.getProto((InputDataInformationEvent)initEvent, (JobConf)new JobConf(conf));
        Assert.assertEquals((Object)proto, (Object)protoFromEvent);
    }

    @Test
    public void testInputEventSerializedPath() throws IOException {
        MRRuntimeProtos.MRSplitProto proto = MRRuntimeProtos.MRSplitProto.newBuilder().setSplitBytes(ByteString.copyFrom((byte[])"splits".getBytes())).build();
        Path splitsDir = localFs.resolvePath(localTestRootDir);
        Path serializedPath = new Path(splitsDir + "/" + "splitpayload");
        try (FSDataOutputStream out = localFs.create(serializedPath);){
            proto.writeTo((OutputStream)out);
        }
        Assert.assertTrue((String)"Event file should be present on fs", (boolean)localFs.exists(serializedPath));
        InputDataInformationEvent initEvent = InputDataInformationEvent.createWithSerializedPath((int)0, (String)serializedPath.toUri().toString());
        MRRuntimeProtos.MRSplitProto protoFromEvent = MRInputHelpers.getProto((InputDataInformationEvent)initEvent, (JobConf)new JobConf(conf));
        Assert.assertEquals((Object)proto, (Object)protoFromEvent);
        Assert.assertFalse((String)"Event file should be deleted after read", (boolean)localFs.exists(serializedPath));
    }

    private void verifyLocationHints(Path inputSplitsDir, List<TaskLocationHint> actual) throws Exception {
        JobID jobId = new JobID("dummy", 1);
        JobSplit.TaskSplitMetaInfo[] splitsInfo = SplitMetaInfoReader.readSplitMetaInfo((JobID)jobId, (FileSystem)remoteFs, (Configuration)conf, (Path)inputSplitsDir);
        int splitsCount = splitsInfo.length;
        ArrayList<TaskLocationHint> locationHints = new ArrayList<TaskLocationHint>(splitsCount);
        for (int i = 0; i < splitsCount; ++i) {
            locationHints.add(TaskLocationHint.createTaskLocationHint(new HashSet<String>(Arrays.asList(splitsInfo[i].getLocations())), null));
        }
        Assert.assertEquals(locationHints, actual);
    }

    private DataSourceDescriptor generateDataSourceDescriptorMapReduce(Path inputSplitsDir) throws Exception {
        JobConf jobConf = new JobConf(dfsCluster.getFileSystem().getConf());
        jobConf.setUseNewMapper(true);
        jobConf.setClass("mapreduce.job.inputformat.class", org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class, InputFormat.class);
        jobConf.set("mapreduce.input.fileinputformat.inputdir", testFilePath.toString());
        return MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)jobConf, (Path)inputSplitsDir, (boolean)true);
    }

    private DataSourceDescriptor generateDataSourceDescriptorMapRed(Path inputSplitsDir) throws Exception {
        JobConf jobConf = new JobConf(dfsCluster.getFileSystem().getConf());
        jobConf.setUseNewMapper(false);
        jobConf.setInputFormat(TextInputFormat.class);
        jobConf.set("mapreduce.input.fileinputformat.inputdir", testFilePath.toString());
        return MRInputHelpers.configureMRInputWithLegacySplitGeneration((Configuration)jobConf, (Path)inputSplitsDir, (boolean)true);
    }

    @Test(timeout=5000L)
    public void testInputSplitLocalResourceCreationWithDifferentFS() throws Exception {
        Path splitsDir = localFs.resolvePath(localTestRootDir);
        DataSourceDescriptor dataSource = this.generateDataSourceDescriptorMapRed(splitsDir);
        Map localResources = dataSource.getAdditionalLocalFiles();
        Assert.assertEquals((long)2L, (long)localResources.size());
        Assert.assertTrue((boolean)localResources.containsKey("job.split"));
        Assert.assertTrue((boolean)localResources.containsKey("job.splitmetainfo"));
        for (LocalResource lr : localResources.values()) {
            Assert.assertFalse((boolean)lr.getResource().getScheme().contains(remoteFs.getScheme()));
        }
    }

    @Before
    public void before() throws IOException {
        localFs.mkdirs(localTestRootDir);
    }

    @After
    public void after() throws IOException {
        localFs.delete(localTestRootDir, true);
    }

    static {
        conf = new Configuration();
    }
}

