package cascading.flow.hadoop;

import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
import cascading.flow.FlowNode;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.planner.HadoopFlowStepJob;
import cascading.flow.hadoop.util.HadoopMRUtil;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.FlowStepJob;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.process.FlowNodeGraph;
import cascading.flow.planner.process.ProcessEdge;
import cascading.management.state.ClientState;
import cascading.pipe.CoGroup;
import cascading.tap.Tap;
import cascading.tap.hadoop.io.MultiInputFormat;
import cascading.tap.hadoop.util.Hadoop18TapUtil;
import cascading.tap.hadoop.util.TempHfs;
import cascading.tuple.Fields;
import cascading.tuple.hadoop.TupleSerialization;
import cascading.tuple.hadoop.util.CoGroupingComparator;
import cascading.tuple.hadoop.util.CoGroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingComparator;
import cascading.tuple.hadoop.util.GroupingPartitioner;
import cascading.tuple.hadoop.util.GroupingSortingComparator;
import cascading.tuple.hadoop.util.GroupingSortingPartitioner;
import cascading.tuple.hadoop.util.IndexTupleCoGroupingComparator;
import cascading.tuple.hadoop.util.ReverseGroupingSortingComparator;
import cascading.tuple.hadoop.util.ReverseTupleComparator;
import cascading.tuple.hadoop.util.TupleComparator;
import cascading.tuple.io.KeyIndexTuple;
import cascading.tuple.io.KeyTuple;
import cascading.tuple.io.TuplePair;
import cascading.tuple.io.ValueIndexTuple;
import cascading.tuple.io.ValueTuple;
import cascading.util.ProcessLogger;
import cascading.util.Util;
import cascading.util.Version;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;

/* loaded from: input_file:cascading/flow/hadoop/HadoopFlowStep.class */
public class HadoopFlowStep extends BaseFlowStep<JobConf> {
    protected HadoopFlowStep(String str, int i) {
        super(str, i);
    }

    public HadoopFlowStep(ElementGraph elementGraph, FlowNodeGraph flowNodeGraph) {
        super(elementGraph, flowNodeGraph);
    }

    public Map<Object, Object> getConfigAsProperties() {
        return HadoopUtil.createProperties((Configuration) getConfig());
    }

    public JobConf createInitializedConfig(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        JobConf jobConf2 = jobConf == null ? new JobConf() : HadoopUtil.copyJobConf(jobConf);
        jobConf2.setBoolean("mapred.used.genericoptionsparser", true);
        jobConf2.setJobName(getStepDisplayName(jobConf2.getInt("cascading.display.id.truncate", Util.ID_LENGTH)));
        jobConf2.setOutputKeyClass(KeyTuple.class);
        jobConf2.setOutputValueClass(ValueTuple.class);
        jobConf2.setMapRunnerClass(FlowMapper.class);
        jobConf2.setReducerClass(FlowReducer.class);
        TupleSerialization.setSerializations(jobConf2);
        initFromSources(flowProcess, jobConf2);
        initFromSink(flowProcess, jobConf2);
        initFromTraps(flowProcess, jobConf2);
        initFromStepConfigDef(jobConf2);
        int numSinkParts = getSink().getScheme().getNumSinkParts();
        if (numSinkParts != 0) {
            if (getGroup() != null) {
                jobConf2.setNumReduceTasks(numSinkParts);
            } else {
                jobConf2.setNumMapTasks(numSinkParts);
            }
        } else if (getGroup() != null) {
            int numReduceTasks = jobConf2.getNumReduceTasks();
            if (numReduceTasks == 0) {
                numReduceTasks = jobConf2.getInt("cascading.flow.runtime.gather.partitions.num", 0);
            }
            if (numReduceTasks == 0) {
                throw new FlowException(getName(), "a default number of gather partitions must be set, see FlowRuntimeProps");
            }
            jobConf2.setNumReduceTasks(numReduceTasks);
        }
        jobConf2.setOutputKeyComparatorClass(TupleComparator.class);
        ProcessEdge processEdge = (ProcessEdge) Util.getFirst(getFlowNodeGraph().edgeSet());
        if (getGroup() == null) {
            jobConf2.setNumReduceTasks(0);
        } else {
            jobConf2.setMapOutputKeyClass(KeyTuple.class);
            jobConf2.setMapOutputValueClass(ValueTuple.class);
            jobConf2.setPartitionerClass(GroupingPartitioner.class);
            if (getGroup().isSortReversed()) {
                jobConf2.setOutputKeyComparatorClass(ReverseTupleComparator.class);
            }
            Integer num = (Integer) Util.getFirst(processEdge.getSinkExpectedOrdinals());
            HadoopUtil.addComparators(jobConf2, "cascading.group.comparator", getGroup().getKeySelectors(), (Fields) processEdge.getResolvedKeyFields().get(num));
            if (getGroup().isGroupBy()) {
                HadoopUtil.addComparators(jobConf2, "cascading.sort.comparator", getGroup().getSortingSelectors(), (Fields) processEdge.getResolvedSortFields().get(num));
            }
            if (!getGroup().isGroupBy()) {
                jobConf2.setPartitionerClass(CoGroupingPartitioner.class);
                jobConf2.setMapOutputKeyClass(KeyIndexTuple.class);
                jobConf2.setMapOutputValueClass(ValueIndexTuple.class);
                jobConf2.setOutputKeyComparatorClass(IndexTupleCoGroupingComparator.class);
                jobConf2.setOutputValueGroupingComparator(CoGroupingComparator.class);
            }
            if (getGroup().isSorted()) {
                jobConf2.setPartitionerClass(GroupingSortingPartitioner.class);
                jobConf2.setMapOutputKeyClass(TuplePair.class);
                if (getGroup().isSortReversed()) {
                    jobConf2.setOutputKeyComparatorClass(ReverseGroupingSortingComparator.class);
                } else {
                    jobConf2.setOutputKeyComparatorClass(GroupingSortingComparator.class);
                }
                jobConf2.setOutputValueGroupingComparator(GroupingComparator.class);
            }
        }
        if (processEdge != null && ifCoGroupAndKeysHaveCommonTypes(this, processEdge.getFlowElement(), processEdge.getResolvedKeyFields())) {
            jobConf2.set("cascading.node.ordinals", Util.join(processEdge.getSinkExpectedOrdinals(), ","));
            HadoopUtil.addFields(jobConf2, "cascading.node.key.fields", processEdge.getResolvedKeyFields());
            HadoopUtil.addFields(jobConf2, "cascading.node.sort.fields", processEdge.getResolvedSortFields());
            HadoopUtil.addFields(jobConf2, "cascading.node.value.fields", processEdge.getResolvedValueFields());
        }
        String release = Version.getRelease();
        if (release != null) {
            jobConf2.set("cascading.version", release);
        }
        jobConf2.set("cascading.flow.step.id", getID());
        jobConf2.set("cascading.flow.step.num", Integer.toString(getOrdinal()));
        HadoopUtil.setIsInflow(jobConf2);
        Iterator topologicalIterator = getFlowNodeGraph().getTopologicalIterator();
        FlowNode flowNode = (FlowNode) topologicalIterator.next();
        FlowNode flowNode2 = topologicalIterator.hasNext() ? (FlowNode) topologicalIterator.next() : null;
        if (flowNode2 != null) {
            flowNode2.addProcessAnnotation("cascading.flow.runtime.gather.partitions.num", Integer.toString(jobConf2.getNumReduceTasks()));
        }
        String pack = HadoopUtil.pack(flowNode, jobConf2);
        String pack2 = HadoopUtil.pack(flowNode2, jobConf2);
        int length = pack.length() + pack2.length();
        if (isHadoopLocalMode(jobConf2) || length < 32767) {
            jobConf2.set("cascading.flow.step.node.map", pack);
            if (!Util.isEmpty(pack2)) {
                jobConf2.set("cascading.flow.step.node.reduce", pack2);
            }
        } else {
            jobConf2.set("cascading.flow.step.node.map.path", HadoopMRUtil.writeStateToDistCache(jobConf2, getID(), "map", pack));
            if (!Util.isEmpty(pack2)) {
                jobConf2.set("cascading.flow.step.node.reduce.path", HadoopMRUtil.writeStateToDistCache(jobConf2, getID(), "reduce", pack2));
            }
        }
        return jobConf2;
    }

    private static boolean ifCoGroupAndKeysHaveCommonTypes(ProcessLogger processLogger, FlowElement flowElement, Map<Integer, Fields> map) {
        if (!(flowElement instanceof CoGroup) || map == null || map.size() < 2) {
            return true;
        }
        Iterator<Map.Entry<Integer, Fields>> it = map.entrySet().iterator();
        Fields value = it.next().getValue();
        while (it.hasNext()) {
            Fields value2 = it.next().getValue();
            if (!Arrays.equals(value.getTypesClasses(), value2.getTypesClasses())) {
                processLogger.logWarn("unable to perform: {}, on mismatched join types and optimize serialization with type exclusion, fields: {} & {}", new Object[]{flowElement, value, value2});
                return false;
            }
        }
        return true;
    }

    public boolean isHadoopLocalMode(JobConf jobConf) {
        return HadoopUtil.isLocal(jobConf);
    }

    protected FlowStepJob<JobConf> createFlowStepJob(ClientState clientState, FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        try {
            return new HadoopFlowStepJob(clientState, this, jobConf);
        } catch (NoClassDefFoundError e) {
            logError(String.format("unable to load platform specific class, please verify Hadoop cluster version: '%s', matches the Hadoop platform build dependency and associated FlowConnector, cascading-hadoop or cascading-hadoop2-mr1", HadoopUtil.getPlatformInfo(JobConf.class, "org/apache/hadoop", "Hadoop MR").toString()), e);
            throw e;
        }
    }

    public void clean(JobConf jobConf) {
        String str = jobConf.get("cascading.flow.step.path");
        if (str != null) {
            try {
                HadoopUtil.removeStateFromDistCache(jobConf, str);
            } catch (IOException e) {
                logWarn("unable to remove step state file: " + str, e);
            }
        }
        if (this.tempSink != null) {
            try {
                this.tempSink.deleteResource(jobConf);
            } catch (Exception e2) {
                logWarn("unable to remove temporary file: " + this.tempSink, e2);
            }
        }
        Iterator it = getSinkTaps().iterator();
        while (it.hasNext()) {
            cleanIntermediateData(jobConf, (Tap) it.next());
        }
        Iterator it2 = getTraps().iterator();
        while (it2.hasNext()) {
            cleanTapMetaData(jobConf, (Tap) it2.next());
        }
    }

    protected void cleanIntermediateData(JobConf jobConf, Tap tap) {
        if (!tap.isTemporary() || (!getFlow().getFlowStats().isSuccessful() && getFlow().getRunID() != null)) {
            cleanTapMetaData(jobConf, tap);
            return;
        }
        try {
            tap.deleteResource(jobConf);
        } catch (Exception e) {
            logWarn("unable to remove temporary file: " + tap, e);
        }
    }

    private void cleanTapMetaData(JobConf jobConf, Tap tap) {
        try {
            Hadoop18TapUtil.cleanupTapMetaData(jobConf, tap);
        } catch (IOException e) {
        }
    }

    private void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf jobConf, Map<String, Tap> map) {
        if (map.isEmpty()) {
            return;
        }
        JobConf copyJobConf = HadoopUtil.copyJobConf(jobConf);
        Iterator<Tap> it = map.values().iterator();
        while (it.hasNext()) {
            it.next().sinkConfInit(flowProcess, copyJobConf);
        }
    }

    protected void initFromSources(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        Set<Tap> uniqueStreamedSources = getUniqueStreamedSources();
        JobConf[] jobConfArr = new JobConf[uniqueStreamedSources.size()];
        int i = 0;
        for (Tap tap : uniqueStreamedSources) {
            if (tap.getIdentifier() == null) {
                throw new IllegalStateException("tap may not have null identifier: " + tap.toString());
            }
            jobConfArr[i] = (JobConf) flowProcess.copyConfig(jobConf);
            jobConfArr[i].set("cascading.step.source", Tap.id(tap));
            tap.sourceConfInit(flowProcess, jobConfArr[i]);
            i++;
        }
        for (Tap tap2 : getAllAccumulatedSources()) {
            JobConf jobConf2 = (JobConf) flowProcess.copyConfig(jobConf);
            tap2.sourceConfInit(flowProcess, jobConf2);
            jobConf.set("cascading.node.accumulated.source.conf." + Tap.id(tap2), HadoopUtil.pack(flowProcess.diffConfigIntoMap(jobConf, jobConf2), jobConf));
            try {
                if (DistributedCache.getCacheFiles(jobConf2) != null) {
                    DistributedCache.setCacheFiles(DistributedCache.getCacheFiles(jobConf2), jobConf);
                }
            } catch (IOException e) {
                throw new CascadingException(e);
            }
        }
        MultiInputFormat.addInputFormat(jobConf, jobConfArr);
    }

    private void initFromStepConfigDef(JobConf jobConf) {
        initConfFromStepConfigDef(new ConfigurationSetter(jobConf));
    }

    private Set<Tap> getUniqueStreamedSources() {
        Set allAccumulatedSources = getAllAccumulatedSources();
        HashSet hashSet = new HashSet(this.sources.keySet());
        hashSet.removeAll(allAccumulatedSources);
        return hashSet;
    }

    protected void initFromSink(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        if (getSink() != null) {
            getSink().sinkConfInit(flowProcess, jobConf);
        }
        Class cls = jobConf.getClass("mapred.output.format.class", (Class) null, OutputFormat.class);
        boolean z = false;
        if (cls != null) {
            z = FileOutputFormat.class.isAssignableFrom(cls);
        }
        if (FileOutputFormat.getOutputPath(jobConf) == null && (z || cls == null)) {
            this.tempSink = new TempHfs((Configuration) jobConf, "tmp:/" + new Path(getSink().getIdentifier()).toUri().getPath(), true);
        }
        if (this.tempSink != null) {
            this.tempSink.sinkConfInit(flowProcess, jobConf);
        }
    }

    protected void initFromTraps(FlowProcess<JobConf> flowProcess, JobConf jobConf) {
        initFromTraps(flowProcess, jobConf, getTrapMap());
    }

    protected /* bridge */ /* synthetic */ FlowStepJob createFlowStepJob(ClientState clientState, FlowProcess flowProcess, Object obj) {
        return createFlowStepJob(clientState, (FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }

    public /* bridge */ /* synthetic */ Object createInitializedConfig(FlowProcess flowProcess, Object obj) {
        return createInitializedConfig((FlowProcess<JobConf>) flowProcess, (JobConf) obj);
    }
}
