/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.mapreduce.examples;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.Preconditions;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.api.KeyValuesReader;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;

public class UnionExample {
    private DAG createDAG(FileSystem fs, TezConfiguration tezConf, Map<String, LocalResource> localResources, Path stagingDir, String inputPath, String outputPath) throws IOException {
        DAG dag = DAG.create((String)"UnionExample");
        int numMaps = -1;
        Configuration inputConf = new Configuration((Configuration)tezConf);
        inputConf.setBoolean("mapred.mapper.new-api", false);
        inputConf.set("mapred.input.format.class", TextInputFormat.class.getName());
        inputConf.set("mapreduce.input.fileinputformat.inputdir", inputPath);
        MRInput.MRInputConfigBuilder configurer = MRInput.createConfigBuilder((Configuration)inputConf, null);
        DataSourceDescriptor dataSource = configurer.generateSplitsInAM(false).build();
        Vertex mapVertex1 = Vertex.create((String)"map1", (ProcessorDescriptor)ProcessorDescriptor.create((String)TokenProcessor.class.getName()), (int)numMaps).addDataSource("MRInput", dataSource);
        Vertex mapVertex2 = Vertex.create((String)"map2", (ProcessorDescriptor)ProcessorDescriptor.create((String)TokenProcessor.class.getName()), (int)numMaps).addDataSource("MRInput", dataSource);
        Vertex mapVertex3 = Vertex.create((String)"map3", (ProcessorDescriptor)ProcessorDescriptor.create((String)TokenProcessor.class.getName()), (int)numMaps).addDataSource("MRInput", dataSource);
        Vertex checkerVertex = Vertex.create((String)"checker", (ProcessorDescriptor)ProcessorDescriptor.create((String)UnionProcessor.class.getName()), (int)1);
        Configuration outputConf = new Configuration((Configuration)tezConf);
        outputConf.setBoolean("mapred.reducer.new-api", false);
        outputConf.set("mapred.output.format.class", TextOutputFormat.class.getName());
        outputConf.set("mapreduce.output.fileoutputformat.outputdir", outputPath);
        DataSinkDescriptor od = MROutput.createConfigBuilder((Configuration)outputConf, null).build();
        checkerVertex.addDataSink("union", od);
        Configuration allPartsConf = new Configuration((Configuration)tezConf);
        DataSinkDescriptor od2 = MROutput.createConfigBuilder((Configuration)allPartsConf, TextOutputFormat.class, (String)(outputPath + "-all-parts")).build();
        checkerVertex.addDataSink("all-parts", od2);
        Configuration partsConf = new Configuration((Configuration)tezConf);
        DataSinkDescriptor od1 = MROutput.createConfigBuilder((Configuration)partsConf, TextOutputFormat.class, (String)(outputPath + "-parts")).build();
        VertexGroup unionVertex = dag.createVertexGroup("union", new Vertex[]{mapVertex1, mapVertex2});
        unionVertex.addDataSink("parts", od1);
        OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)IntWritable.class.getName(), (String)HashPartitioner.class.getName()).build();
        dag.addVertex(mapVertex1).addVertex(mapVertex2).addVertex(mapVertex3).addVertex(checkerVertex).addEdge(Edge.create((Vertex)mapVertex3, (Vertex)checkerVertex, (EdgeProperty)edgeConf.createDefaultEdgeProperty())).addEdge(GroupInputEdge.create((VertexGroup)unionVertex, (Vertex)checkerVertex, (EdgeProperty)edgeConf.createDefaultEdgeProperty(), (InputDescriptor)InputDescriptor.create((String)ConcatenatedMergedKeyValuesInput.class.getName())));
        return dag;
    }

    private static void printUsage() {
        System.err.println("Usage:  unionexample <in1> <out1>");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean run(String inputPath, String outputPath, Configuration conf) throws Exception {
        System.out.println("Running UnionExample");
        TezConfiguration tezConf = conf != null ? new TezConfiguration(conf) : new TezConfiguration();
        UserGroupInformation.setConfiguration((Configuration)tezConf);
        String user = UserGroupInformation.getCurrentUser().getShortUserName();
        FileSystem fs = FileSystem.get((Configuration)tezConf);
        String stagingDirStr = "/user/" + user + "/" + ".staging" + "/" + "/" + Long.toString(System.currentTimeMillis());
        Path stagingDir = new Path(stagingDirStr);
        tezConf.set("tez.staging-dir", stagingDirStr);
        stagingDir = fs.makeQualified(stagingDir);
        TezClient tezSession = TezClient.create((String)"UnionExampleSession", (TezConfiguration)tezConf);
        tezSession.start();
        DAGClient dagClient = null;
        try {
            Path outputPathAsPath = new Path(outputPath);
            FileSystem outputFs = outputPathAsPath.getFileSystem((Configuration)tezConf);
            outputPathAsPath = outputFs.makeQualified(outputPathAsPath);
            if (outputFs.exists(outputPathAsPath)) {
                throw new FileAlreadyExistsException("Output directory " + outputPath + " already exists");
            }
            TreeMap<String, LocalResource> localResources = new TreeMap<String, LocalResource>();
            DAG dag = this.createDAG(fs, tezConf, localResources, stagingDir, inputPath, outputPath);
            tezSession.waitTillReady();
            dagClient = tezSession.submitDAG(dag);
            DAGStatus dagStatus = dagClient.waitForCompletionWithStatusUpdates(EnumSet.of(StatusGetOpts.GET_COUNTERS));
            if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
                System.out.println("DAG diagnostics: " + dagStatus.getDiagnostics());
                boolean bl = false;
                return bl;
            }
            boolean bl = true;
            return bl;
        }
        finally {
            fs.delete(stagingDir, true);
            tezSession.stop();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            UnionExample.printUsage();
            System.exit(2);
        }
        UnionExample job = new UnionExample();
        job.run(args[0], args[1], null);
    }

    public static class TokenProcessor
    extends SimpleMRProcessor {
        IntWritable one = new IntWritable(1);
        Text word = new Text();

        public TokenProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkArgument((this.getInputs().size() == 1 ? 1 : 0) != 0);
            boolean inUnion = true;
            if (this.getContext().getTaskVertexName().equals("map3")) {
                inUnion = false;
            }
            Preconditions.checkArgument((this.getOutputs().size() == (inUnion ? 2 : 1) ? 1 : 0) != 0);
            Preconditions.checkArgument((boolean)this.getOutputs().containsKey("checker"));
            MRInput input = (MRInput)this.getInputs().values().iterator().next();
            KeyValueReader kvReader = input.getReader();
            Output output = (Output)this.getOutputs().get("checker");
            KeyValueWriter kvWriter = (KeyValueWriter)output.getWriter();
            MROutput parts = null;
            KeyValueWriter partsWriter = null;
            if (inUnion) {
                parts = (MROutput)this.getOutputs().get("parts");
                partsWriter = parts.getWriter();
            }
            while (kvReader.next()) {
                StringTokenizer itr = new StringTokenizer(kvReader.getCurrentValue().toString());
                while (itr.hasMoreTokens()) {
                    this.word.set(itr.nextToken());
                    kvWriter.write((Object)this.word, (Object)this.one);
                    if (!inUnion) continue;
                    partsWriter.write((Object)this.word, (Object)this.one);
                }
            }
        }
    }

    public static class UnionProcessor
    extends SimpleMRProcessor {
        IntWritable one = new IntWritable(1);

        public UnionProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkArgument((this.getInputs().size() == 2 ? 1 : 0) != 0);
            Preconditions.checkArgument((this.getOutputs().size() == 2 ? 1 : 0) != 0);
            MROutput out = (MROutput)this.getOutputs().get("union");
            MROutput allParts = (MROutput)this.getOutputs().get("all-parts");
            KeyValueWriter kvWriter = out.getWriter();
            KeyValueWriter partsWriter = allParts.getWriter();
            HashMap unionKv = Maps.newHashMap();
            LogicalInput union = (LogicalInput)this.getInputs().get("union");
            KeyValuesReader kvReader = (KeyValuesReader)union.getReader();
            while (kvReader.next()) {
                String word = ((Text)kvReader.getCurrentKey()).toString();
                IntWritable intVal = (IntWritable)kvReader.getCurrentValues().iterator().next();
                for (int i = 0; i < intVal.get(); ++i) {
                    partsWriter.write((Object)word, (Object)this.one);
                }
                AtomicInteger value = (AtomicInteger)unionKv.get(word);
                if (value == null) {
                    unionKv.put(word, new AtomicInteger(intVal.get()));
                    continue;
                }
                value.addAndGet(intVal.get());
            }
            LogicalInput map3 = (LogicalInput)this.getInputs().get("map3");
            kvReader = (KeyValuesReader)map3.getReader();
            while (kvReader.next()) {
                String word = ((Text)kvReader.getCurrentKey()).toString();
                IntWritable intVal = (IntWritable)kvReader.getCurrentValues().iterator().next();
                AtomicInteger value = (AtomicInteger)unionKv.get(word);
                if (value == null) {
                    throw new TezUncheckedException("Expected to exist: " + word);
                }
                value.getAndAdd(intVal.get() * -2);
            }
            for (AtomicInteger value : unionKv.values()) {
                if (value.get() == 0) continue;
                throw new TezUncheckedException("Unexpected non-zero value");
            }
            kvWriter.write((Object)"Union", (Object)new IntWritable(unionKv.size()));
        }
    }
}

