/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.optimizer;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Stack;
import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
import org.apache.hadoop.hive.ql.lib.SemanticRule;
import org.apache.hadoop.hive.ql.optimizer.Transform;
import org.apache.hadoop.hive.ql.optimizer.topnkey.TopNKeyProcessor;
import org.apache.hadoop.hive.ql.optimizer.topnkey.TopNKeyPushdownProcessor;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OrderlessLimitPushDownOptimizer
extends Transform {
    private static final Logger LOG = LoggerFactory.getLogger(OrderlessLimitPushDownOptimizer.class);

    @Override
    public ParseContext transform(ParseContext pctx) throws SemanticException {
        LinkedHashMap<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
        opRules.put(new RuleRegExp("LIMIT push down", LimitOperator.getOperatorName() + "%"), new LimitPushDown());
        DefaultGraphWalker walker = new DefaultGraphWalker(new DefaultRuleDispatcher(null, opRules, null));
        walker.startWalking(new ArrayList<Node>(pctx.getTopOps().values()), null);
        return pctx;
    }

    private static class LimitPushDown
    implements SemanticNodeProcessor {
        private LimitPushDown() {
        }

        @Override
        public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object ... nodeOutputs) throws SemanticException {
            ReduceSinkOperator reduceSink = this.findReduceSink(stack);
            if (reduceSink == null || !((ReduceSinkDesc)reduceSink.getConf()).hasOrderBy()) {
                this.pushDown((LimitOperator)nd);
            }
            return null;
        }

        private ReduceSinkOperator findReduceSink(Stack<Node> stack) {
            for (int i = stack.size() - 2; i >= 0; --i) {
                Operator operator = (Operator)stack.get(i);
                if (!(operator instanceof ReduceSinkOperator)) continue;
                return (ReduceSinkOperator)operator;
            }
            return null;
        }

        private void pushDown(LimitOperator limit) throws SemanticException {
            Operator<OperatorDesc> parent = limit.getParentOperators().get(0);
            if (parent.getNumChild() != 1) {
                return;
            }
            switch (parent.getType()) {
                case LIMIT: {
                    this.combineLimits(limit);
                    break;
                }
                case SELECT: 
                case FORWARD: {
                    this.pushdownThroughParent(limit);
                    break;
                }
                case MERGEJOIN: 
                case JOIN: 
                case MAPJOIN: {
                    this.pushThroughLeftOuterJoin(limit);
                    break;
                }
            }
        }

        private void combineLimits(LimitOperator childLimit) throws SemanticException {
            LimitOperator parentLimit = (LimitOperator)childLimit.getParentOperators().get(0);
            LimitDesc parentConf = (LimitDesc)parentLimit.getConf();
            LimitDesc childConf = (LimitDesc)childLimit.getConf();
            if (parentConf.getOffset() == childConf.getOffset()) {
                int min = Math.min(parentConf.getLimit(), childConf.getLimit());
                LOG.debug("Combining two limits child={}, parent={}, newLimit={}", new Object[]{childLimit, parentLimit, min});
                parentConf.setLimit(min);
                parentLimit.removeChildAndAdoptItsChildren(childLimit);
                this.pushDown(parentLimit);
            }
        }

        private void pushdownThroughParent(LimitOperator limit) throws SemanticException {
            Operator<OperatorDesc> parent = limit.getParentOperators().get(0);
            LOG.debug("Pushing {} through {}", (Object)limit.getName(), (Object)parent.getName());
            TopNKeyPushdownProcessor.moveDown(limit);
            this.pushDown(limit);
        }

        private void pushThroughLeftOuterJoin(LimitOperator limit) throws SemanticException {
            CommonJoinOperator join = (CommonJoinOperator)limit.getParentOperators().get(0);
            JoinCondDesc[] joinConds = ((JoinDesc)join.getConf()).getConds();
            JoinCondDesc firstJoinCond = joinConds[0];
            for (JoinCondDesc joinCond : joinConds) {
                if (firstJoinCond.equals(joinCond)) continue;
                return;
            }
            if (firstJoinCond.getType() == 1) {
                List<Operator<OperatorDesc>> joinInputs = join.getParentOperators();
                ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator)joinInputs.get(0);
                this.pushDown((LimitOperator)TopNKeyProcessor.copyDown(reduceSinkOperator, new LimitDesc((LimitDesc)limit.getConf())));
                ((LimitDesc)limit.getConf()).setOffset(0);
            }
        }
    }
}

