/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hive.mapreduce;

import java.io.IOException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.phoenix.compat.CompatUtil;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.hive.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.hive.mapreduce.PhoenixRecordReader;
import org.apache.phoenix.hive.mapreduce.PhoenixResultWritable;
import org.apache.phoenix.hive.ppd.PhoenixPredicateDecomposer;
import org.apache.phoenix.hive.ql.index.IndexSearchCondition;
import org.apache.phoenix.hive.query.PhoenixQueryBuilder;
import org.apache.phoenix.hive.util.PhoenixConnectionUtil;
import org.apache.phoenix.hive.util.PhoenixStorageHandlerUtil;
import org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.KeyRange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixInputFormat<T extends DBWritable>
implements InputFormat<WritableComparable, T> {
    private static final Logger LOG = LoggerFactory.getLogger(PhoenixInputFormat.class);

    public PhoenixInputFormat() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("PhoenixInputFormat created");
        }
    }

    public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
        String tableName = jobConf.get("phoenix.table.name");
        String executionEngine = jobConf.get(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.getDefaultValue());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Target table name at split phase : " + tableName + "with whereCondition :" + jobConf.get("hive.io.filter.text") + " and " + HiveConf.ConfVars.HIVE_EXECUTION_ENGINE.varname + " : " + executionEngine);
        }
        List<IndexSearchCondition> conditionList = null;
        String filterExprSerialized = jobConf.get("hive.io.filter.expr.serialized");
        if (filterExprSerialized != null) {
            ExprNodeGenericFuncDesc filterExpr = SerializationUtilities.deserializeExpression((String)filterExprSerialized);
            PhoenixPredicateDecomposer predicateDecomposer = PhoenixPredicateDecomposer.create(Arrays.asList(jobConf.get("columns").split(",")));
            predicateDecomposer.decomposePredicate((ExprNodeDesc)filterExpr);
            if (predicateDecomposer.isCalledPPD()) {
                conditionList = predicateDecomposer.getSearchConditionList();
            }
        }
        String query = PhoenixQueryBuilder.getInstance().buildQuery(jobConf, tableName, PhoenixStorageHandlerUtil.getReadColumnNames((Configuration)jobConf), conditionList);
        QueryPlan queryPlan = this.getQueryPlan((Configuration)jobConf, query);
        List allSplits = queryPlan.getSplits();
        List<InputSplit> splits = this.generateSplits(jobConf, queryPlan, allSplits, query);
        return splits.toArray(new InputSplit[splits.size()]);
    }

    private List<InputSplit> generateSplits(JobConf jobConf, QueryPlan qplan, List<KeyRange> splits, String query) throws IOException {
        if (qplan == null) {
            throw new NullPointerException();
        }
        if (splits == null) {
            throw new NullPointerException();
        }
        ArrayList<InputSplit> psplits = new ArrayList<InputSplit>(splits.size());
        Path[] tablePaths = FileInputFormat.getInputPaths((JobContext)ShimLoader.getHadoopShims().newJobContext(new Job((Configuration)jobConf)));
        boolean splitByStats = jobConf.getBoolean("split.by.stats", false);
        this.setScanCacheSize(jobConf);
        try (Connection connection = ConnectionFactory.createConnection((Configuration)PhoenixConnectionUtil.getConfiguration(jobConf));){
            RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf((String)qplan.getTableRef().getTable().getPhysicalName().toString()));
            for (List scans : qplan.getScans()) {
                PhoenixInputSplit inputSplit;
                HRegionLocation location = regionLocator.getRegionLocation(((Scan)scans.get(0)).getStartRow(), false);
                long regionSize = CompatUtil.getSize((RegionLocator)regionLocator, (Admin)connection.getAdmin(), (HRegionLocation)location);
                String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);
                if (splitByStats) {
                    for (Scan aScan : scans) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Split for  scan : " + aScan + "with scanAttribute : " + aScan.getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan.getBatch() + "] and  regionLocation : " + regionLocation);
                        }
                        inputSplit = new PhoenixInputSplit(new ArrayList<Scan>(Arrays.asList(aScan)), tablePaths[0], regionLocation, regionSize);
                        inputSplit.setQuery(query);
                        psplits.add(inputSplit);
                    }
                    continue;
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary((byte[])((Scan)scans.get(0)).getStartRow()) + " ~ " + Bytes.toStringBinary((byte[])((Scan)scans.get(scans.size() - 1)).getStopRow()));
                    LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + ((Scan)scans.get(0)).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + ((Scan)scans.get(0)).getCaching() + ", " + ((Scan)scans.get(0)).getCacheBlocks() + ", " + ((Scan)scans.get(0)).getBatch() + "] and  regionLocation : " + regionLocation);
                    int limit = scans.size();
                    for (int i = 0; i < limit; ++i) {
                        LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes.toStringBinary((byte[])((Scan)scans.get(i)).getAttribute("_ExpectedUpperRegionKey")));
                    }
                }
                inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation, regionSize);
                inputSplit.setQuery(query);
                psplits.add(inputSplit);
            }
        }
        return psplits;
    }

    private void setScanCacheSize(JobConf jobConf) {
        int scanCacheSize = jobConf.getInt("hbase.scan.cache", -1);
        if (scanCacheSize > 0) {
            jobConf.setInt("hbase.client.scanner.caching", scanCacheSize);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize);
        }
    }

    public RecordReader<WritableComparable, T> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException {
        QueryPlan queryPlan = this.getQueryPlan((Configuration)job, ((PhoenixInputSplit)split).getQuery());
        Class inputClass = job.getClass("phoenix.input.class", PhoenixResultWritable.class);
        PhoenixRecordReader recordReader = new PhoenixRecordReader(inputClass, (Configuration)job, queryPlan);
        recordReader.initialize(split);
        return recordReader;
    }

    private QueryPlan getQueryPlan(Configuration configuration, String selectStatement) throws IOException {
        try {
            String currentScnValue = configuration.get("phoenix.mr.currentscn.value");
            Properties overridingProps = new Properties();
            if (currentScnValue != null) {
                overridingProps.put("CurrentSCN", currentScnValue);
            }
            java.sql.Connection connection = PhoenixConnectionUtil.getInputConnection(configuration, overridingProps);
            if (selectStatement == null) {
                throw new NullPointerException();
            }
            Statement statement = connection.createStatement();
            PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Compiled query : " + selectStatement);
            }
            QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement);
            queryPlan.iterator((ParallelScanGrouper)MapReduceParallelScanGrouper.getInstance());
            return queryPlan;
        }
        catch (Exception exception) {
            LOG.error(String.format("Failed to get the query plan with error [%s]", exception.getMessage()));
            throw new RuntimeException(exception);
        }
    }
}

