/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.examples;

import java.io.IOException;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.Preconditions;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.examples.TezExampleBase;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.processor.SimpleMRProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.library.api.KeyValueReader;
import org.apache.tez.runtime.library.api.KeyValueWriter;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HashJoinExample
extends TezExampleBase {
    private static final Logger LOG = LoggerFactory.getLogger(HashJoinExample.class);
    private static final String broadcastOption = "doBroadcast";
    private static final String streamingSide = "streamingSide";
    private static final String hashSide = "hashSide";
    private static final String inputFile = "inputFile";
    private static final String joiner = "joiner";
    private static final String joinOutput = "joinOutput";

    public static void main(String[] args) throws Exception {
        HashJoinExample job = new HashJoinExample();
        int status = ToolRunner.run((Configuration)new Configuration(), (Tool)job, (String[])args);
        System.exit(status);
    }

    @Override
    protected void printUsage() {
        System.err.println("Usage: hashjoin <file1> <file2> <numPartitions> <outPath> [doBroadcast(default false)]");
    }

    @Override
    protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) throws Exception {
        boolean doBroadcast = args.length == 5 && args[4].equals(broadcastOption);
        LOG.info("Running HashJoinExample" + (doBroadcast ? "-WithBroadcast" : ""));
        String streamInputDir = args[0];
        String hashInputDir = args[1];
        int numPartitions = Integer.parseInt(args[2]);
        String outputDir = args[3];
        Path streamInputPath = new Path(streamInputDir);
        Path hashInputPath = new Path(hashInputDir);
        Path outputPath = new Path(outputDir);
        FileSystem fs = outputPath.getFileSystem((Configuration)tezConf);
        outputPath = fs.makeQualified(outputPath);
        if (fs.exists(outputPath)) {
            System.err.println("Output directory: " + outputDir + " already exists");
            return 3;
        }
        if (numPartitions <= 0) {
            System.err.println("NumPartitions must be > 0");
            return 4;
        }
        DAG dag = this.createDag(tezConf, streamInputPath, hashInputPath, outputPath, numPartitions, doBroadcast);
        return this.runDag(dag, this.isCountersLog(), LOG);
    }

    @Override
    protected int validateArgs(String[] otherArgs) {
        if (otherArgs.length != 4 && otherArgs.length != 5) {
            return 2;
        }
        return 0;
    }

    private DAG createDag(TezConfiguration tezConf, Path streamPath, Path hashPath, Path outPath, int numPartitions, boolean doBroadcast) throws IOException {
        DAG dag = DAG.create((String)("HashJoinExample" + (doBroadcast ? "-WithBroadcast" : "")));
        Vertex hashFileVertex = Vertex.create((String)hashSide, (ProcessorDescriptor)ProcessorDescriptor.create((String)ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)hashPath.toUri().toString()).groupSplits(!this.isDisableSplitGrouping()).generateSplitsInAM(!this.isGenerateSplitInClient()).build());
        Vertex streamFileVertex = Vertex.create((String)streamingSide, (ProcessorDescriptor)ProcessorDescriptor.create((String)ForwardingProcessor.class.getName())).addDataSource(inputFile, MRInput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextInputFormat.class, (String)streamPath.toUri().toString()).groupSplits(!this.isDisableSplitGrouping()).generateSplitsInAM(!this.isGenerateSplitInClient()).build());
        Vertex joinVertex = Vertex.create((String)joiner, (ProcessorDescriptor)ProcessorDescriptor.create((String)HashJoinProcessor.class.getName()), (int)numPartitions).addDataSink(joinOutput, MROutput.createConfigBuilder((Configuration)new Configuration((Configuration)tezConf), TextOutputFormat.class, (String)outPath.toUri().toString()).build());
        UnorderedPartitionedKVEdgeConfig streamConf = UnorderedPartitionedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)NullWritable.class.getName(), (String)HashPartitioner.class.getName()).setFromConfiguration((Configuration)tezConf).build();
        Edge e1 = Edge.create((Vertex)streamFileVertex, (Vertex)joinVertex, (EdgeProperty)streamConf.createDefaultEdgeProperty());
        EdgeProperty hashSideEdgeProperty = null;
        if (doBroadcast) {
            UnorderedKVEdgeConfig broadcastConf = UnorderedKVEdgeConfig.newBuilder((String)Text.class.getName(), (String)NullWritable.class.getName()).setFromConfiguration((Configuration)tezConf).build();
            hashSideEdgeProperty = broadcastConf.createDefaultBroadcastEdgeProperty();
        } else {
            hashSideEdgeProperty = streamConf.createDefaultEdgeProperty();
        }
        Edge e2 = Edge.create((Vertex)hashFileVertex, (Vertex)joinVertex, (EdgeProperty)hashSideEdgeProperty);
        dag.addVertex(streamFileVertex).addVertex(hashFileVertex).addVertex(joinVertex).addEdge(e1).addEdge(e2);
        return dag;
    }

    public static class ForwardingProcessor
    extends SimpleProcessor {
        public ForwardingProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 1 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 1 ? 1 : 0) != 0);
            LogicalInput input = (LogicalInput)this.getInputs().values().iterator().next();
            Reader rawReader = input.getReader();
            Preconditions.checkState((boolean)(rawReader instanceof KeyValueReader));
            LogicalOutput output = (LogicalOutput)this.getOutputs().values().iterator().next();
            KeyValueReader reader = (KeyValueReader)rawReader;
            KeyValueWriter writer = (KeyValueWriter)output.getWriter();
            while (reader.next()) {
                Object val = reader.getCurrentValue();
                writer.write(val, (Object)NullWritable.get());
            }
        }
    }

    public static class HashJoinProcessor
    extends SimpleMRProcessor {
        public HashJoinProcessor(ProcessorContext context) {
            super(context);
        }

        public void run() throws Exception {
            Preconditions.checkState((this.getInputs().size() == 2 ? 1 : 0) != 0);
            Preconditions.checkState((this.getOutputs().size() == 1 ? 1 : 0) != 0);
            LogicalInput streamInput = (LogicalInput)this.getInputs().get(HashJoinExample.streamingSide);
            LogicalInput hashInput = (LogicalInput)this.getInputs().get(HashJoinExample.hashSide);
            Reader rawStreamReader = streamInput.getReader();
            Reader rawHashReader = hashInput.getReader();
            Preconditions.checkState((boolean)(rawStreamReader instanceof KeyValueReader));
            Preconditions.checkState((boolean)(rawHashReader instanceof KeyValueReader));
            LogicalOutput lo = (LogicalOutput)this.getOutputs().get(HashJoinExample.joinOutput);
            Preconditions.checkState((boolean)(lo.getWriter() instanceof KeyValueWriter));
            KeyValueWriter writer = (KeyValueWriter)lo.getWriter();
            KeyValueReader hashKvReader = (KeyValueReader)rawHashReader;
            HashSet<Text> keySet = new HashSet<Text>();
            while (hashKvReader.next()) {
                keySet.add(new Text((Text)hashKvReader.getCurrentKey()));
            }
            KeyValueReader streamKvReader = (KeyValueReader)rawStreamReader;
            while (streamKvReader.next()) {
                Text key = (Text)streamKvReader.getCurrentKey();
                if (!keySet.contains(key)) continue;
                writer.write((Object)key, (Object)NullWritable.get());
            }
        }
    }
}

