/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.mapreduce;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.phoenix.jdbc.ConnectionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.CsvBulkImportUtil;
import org.apache.phoenix.mapreduce.FormatToBytesWritableMapper;
import org.apache.phoenix.mapreduce.FormatToKeyValueReducer;
import org.apache.phoenix.mapreduce.MultiHfileOutputFormat;
import org.apache.phoenix.mapreduce.PhoenixTextInputFormat;
import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRef;
import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.base.Splitter;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.DefaultParser;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.Option;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.Options;
import org.apache.phoenix.thirdparty.org.apache.commons.cli.ParseException;
import org.apache.phoenix.util.ColumnInfo;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBulkLoadTool
extends Configured
implements Tool {
    protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractBulkLoadTool.class);
    static final Option ZK_QUORUM_OPT = new Option("z", "zookeeper", true, "Supply zookeeper connection details (optional)");
    static final Option INPUT_PATH_OPT = new Option("i", "input", true, "Input path(s) (comma-separated, mandatory)");
    static final Option OUTPUT_PATH_OPT = new Option("o", "output", true, "Output path for temporary HFiles (optional)");
    static final Option SCHEMA_NAME_OPT = new Option("s", "schema", true, "Phoenix schema name (optional)");
    static final Option TABLE_NAME_OPT = new Option("t", "table", true, "Phoenix table name (mandatory)");
    static final Option INDEX_TABLE_NAME_OPT = new Option("it", "index-table", true, "Phoenix index table name when just loading this particualar index table");
    static final Option IMPORT_COLUMNS_OPT = new Option("c", "import-columns", true, "Comma-separated list of columns to be imported");
    static final Option IGNORE_ERRORS_OPT = new Option("g", "ignore-errors", false, "Ignore input errors");
    static final Option HELP_OPT = new Option("h", "help", false, "Show this help and quit");
    static final Option SKIP_HEADER_OPT = new Option("k", "skip-header", false, "Skip the first line of CSV files (the header)");
    static final Option ENABLE_CORRUPT_INDEXES = new Option("corruptindexes", "corruptindexes", false, "Allow bulk loading into non-empty tables with global secondary indexes");

    protected abstract void configureOptions(CommandLine var1, List<ColumnInfo> var2, Configuration var3) throws SQLException;

    protected abstract void setupJob(Job var1);

    protected Options getOptions() {
        Options options = new Options();
        options.addOption(INPUT_PATH_OPT);
        options.addOption(TABLE_NAME_OPT);
        options.addOption(INDEX_TABLE_NAME_OPT);
        options.addOption(ZK_QUORUM_OPT);
        options.addOption(OUTPUT_PATH_OPT);
        options.addOption(SCHEMA_NAME_OPT);
        options.addOption(IMPORT_COLUMNS_OPT);
        options.addOption(IGNORE_ERRORS_OPT);
        options.addOption(HELP_OPT);
        options.addOption(SKIP_HEADER_OPT);
        options.addOption(ENABLE_CORRUPT_INDEXES);
        return options;
    }

    protected CommandLine parseOptions(String[] args) {
        Options options = this.getOptions();
        DefaultParser parser = DefaultParser.builder().setAllowPartialMatching(false).setStripLeadingAndTrailingQuotes(Boolean.valueOf(false)).build();
        CommandLine cmdLine = null;
        try {
            cmdLine = parser.parse(options, args);
        }
        catch (ParseException e) {
            this.printHelpAndExit("Error parsing command line options: " + e.getMessage(), options);
        }
        if (cmdLine.hasOption(HELP_OPT.getOpt())) {
            this.printHelpAndExit(options, 0);
        }
        if (!cmdLine.hasOption(TABLE_NAME_OPT.getOpt())) {
            throw new IllegalStateException(TABLE_NAME_OPT.getLongOpt() + " is a mandatory parameter");
        }
        if (!cmdLine.getArgList().isEmpty()) {
            throw new IllegalStateException("Got unexpected extra parameters: " + cmdLine.getArgList());
        }
        if (!cmdLine.hasOption(INPUT_PATH_OPT.getOpt())) {
            throw new IllegalStateException(INPUT_PATH_OPT.getLongOpt() + " is a mandatory parameter");
        }
        return cmdLine;
    }

    private void printHelpAndExit(String errorMessage, Options options) {
        System.err.println(errorMessage);
        this.printHelpAndExit(options, 1);
    }

    private void printHelpAndExit(Options options, int exitCode) {
        HelpFormatter formatter = new HelpFormatter();
        formatter.printHelp("help", options);
        System.exit(exitCode);
    }

    public int run(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create((Configuration)this.getConf());
        CommandLine cmdLine = null;
        try {
            cmdLine = this.parseOptions(args);
        }
        catch (IllegalStateException e) {
            this.printHelpAndExit(e.getMessage(), this.getOptions());
        }
        try {
            return this.loadData(conf, cmdLine);
        }
        catch (Exception e) {
            e.printStackTrace();
            return -1;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int loadData(Configuration conf, CommandLine cmdLine) throws Exception {
        Path outputPath;
        String tableName = cmdLine.getOptionValue(TABLE_NAME_OPT.getOpt());
        String schemaName = cmdLine.getOptionValue(SCHEMA_NAME_OPT.getOpt());
        String indexTableName = cmdLine.getOptionValue(INDEX_TABLE_NAME_OPT.getOpt());
        String qualifiedTableName = SchemaUtil.getQualifiedTableName((String)schemaName, (String)tableName);
        String qualifiedIndexTableName = null;
        if (indexTableName != null) {
            qualifiedIndexTableName = SchemaUtil.getQualifiedTableName((String)schemaName, (String)indexTableName);
        }
        if (cmdLine.hasOption(ZK_QUORUM_OPT.getOpt())) {
            String zkQuorum = cmdLine.getOptionValue(ZK_QUORUM_OPT.getOpt());
            ConnectionInfo info = ConnectionInfo.create((String)("jdbc:phoenix+zk:" + zkQuorum), (Configuration)conf, null, null);
            LOGGER.info("Configuring HBase connection to {}", (Object)info);
            for (Map.Entry entry : info.asProps()) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Setting {} = {}", entry.getKey(), entry.getValue());
                }
                conf.set((String)entry.getKey(), (String)entry.getValue());
            }
        }
        if (cmdLine.hasOption(SKIP_HEADER_OPT.getOpt())) {
            PhoenixTextInputFormat.setSkipHeader(conf);
        }
        String inputPaths = cmdLine.getOptionValue(INPUT_PATH_OPT.getOpt());
        ArrayList<TargetTableRef> tablesToBeLoaded = new ArrayList<TargetTableRef>();
        boolean hasLocalIndexes = false;
        try (java.sql.Connection conn = QueryUtil.getConnection((Configuration)conf);){
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Reading columns from {} :: {}", (Object)((PhoenixConnection)conn).getURL(), (Object)qualifiedTableName);
            }
            List<ColumnInfo> importColumns = this.buildImportColumns(conn, cmdLine, qualifiedTableName);
            Preconditions.checkNotNull(importColumns);
            Preconditions.checkArgument((!importColumns.isEmpty() ? 1 : 0) != 0, (Object)"Column info list is empty");
            FormatToBytesWritableMapper.configureColumnInfoList(conf, importColumns);
            boolean ignoreInvalidRows = cmdLine.hasOption(IGNORE_ERRORS_OPT.getOpt());
            conf.setBoolean("phoenix.mapreduce.import.ignoreinvalidrow", ignoreInvalidRows);
            String tbn = SchemaUtil.getEscapedFullTableName((String)qualifiedTableName);
            conf.set("phoenix.mapreduce.import.tablename", tbn);
            this.configureOptions(cmdLine, importColumns, conf);
            String sName = SchemaUtil.normalizeIdentifier((String)schemaName);
            String tName = SchemaUtil.normalizeIdentifier((String)tableName);
            ResultSet rsempty = conn.createStatement().executeQuery("SELECT * FROM " + tbn + " LIMIT 1");
            boolean tableNotEmpty = rsempty.next();
            rsempty.close();
            try {
                this.validateTable(conn, sName, tName);
            }
            finally {
                conn.close();
            }
            outputPath = cmdLine.hasOption(OUTPUT_PATH_OPT.getOpt()) ? new Path(cmdLine.getOptionValue(OUTPUT_PATH_OPT.getOpt())) : new Path("/tmp/" + UUID.randomUUID());
            PTable table = PhoenixRuntime.getTable((java.sql.Connection)conn, (String)qualifiedTableName);
            tablesToBeLoaded.add(new TargetTableRef(qualifiedTableName, table.getPhysicalName().getString()));
            boolean hasGlobalIndexes = false;
            for (PTable index : table.getIndexes()) {
                if (index.getIndexType() == PTable.IndexType.LOCAL) {
                    boolean bl = hasLocalIndexes = qualifiedIndexTableName == null ? true : index.getTableName().getString().equals(qualifiedIndexTableName);
                    if (hasLocalIndexes && hasGlobalIndexes) break;
                }
                if (!IndexUtil.isGlobalIndex((PTable)index)) continue;
                hasGlobalIndexes = true;
                if (!hasLocalIndexes || !hasGlobalIndexes) continue;
                break;
            }
            if (hasGlobalIndexes && tableNotEmpty && !cmdLine.hasOption(ENABLE_CORRUPT_INDEXES.getOpt())) {
                throw new IllegalStateException("Bulk Loading error: Bulk loading is disabled for non empty tables with global indexes, because it will corrupt the global index table in most cases.\nUse the --corruptindexes option to override this check.");
            }
            tablesToBeLoaded.addAll(this.getIndexTables(conn, qualifiedTableName));
            if (qualifiedIndexTableName != null) {
                TargetTableRef targetIndexRef = null;
                for (TargetTableRef tmpTable : tablesToBeLoaded) {
                    if (tmpTable.getLogicalName().compareToIgnoreCase(qualifiedIndexTableName) != 0) continue;
                    targetIndexRef = tmpTable;
                    break;
                }
                if (targetIndexRef == null) {
                    throw new IllegalStateException("Bulk Loader error: index table " + qualifiedIndexTableName + " doesn't exist");
                }
                tablesToBeLoaded.clear();
                tablesToBeLoaded.add(targetIndexRef);
            }
        }
        return this.submitJob(conf, tableName, inputPaths, outputPath, tablesToBeLoaded, hasLocalIndexes);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int submitJob(Configuration conf, String qualifiedTableName, String inputPaths, Path outputPath, List<TargetTableRef> tablesToBeLoaded, boolean hasLocalIndexes) throws Exception {
        Job job = Job.getInstance((Configuration)conf, (String)("Phoenix MapReduce import for " + qualifiedTableName));
        FileInputFormat.addInputPaths((Job)job, (String)inputPaths);
        FileOutputFormat.setOutputPath((Job)job, (Path)outputPath);
        job.setInputFormatClass(PhoenixTextInputFormat.class);
        job.setMapOutputKeyClass(TableRowkeyPair.class);
        job.setMapOutputValueClass(ImmutableBytesWritable.class);
        job.setOutputKeyClass(TableRowkeyPair.class);
        job.setOutputValueClass(KeyValue.class);
        job.setReducerClass(FormatToKeyValueReducer.class);
        byte[][] splitKeysBeforeJob = null;
        try (Connection hbaseConn = ConnectionFactory.createConnection((Configuration)job.getConfiguration());){
            RegionLocator regionLocator = null;
            if (hasLocalIndexes) {
                try {
                    regionLocator = hbaseConn.getRegionLocator(TableName.valueOf((String)qualifiedTableName));
                    splitKeysBeforeJob = regionLocator.getStartKeys();
                }
                finally {
                    if (regionLocator != null) {
                        regionLocator.close();
                    }
                }
            }
            MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
            String tableNamesAsJson = (String)TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
            String logicalNamesAsJson = (String)TargetTableRefFunctions.LOGICAL_NAMES_TO_JSON.apply(tablesToBeLoaded);
            job.getConfiguration().set("phoenix.mapreduce.import.tablenames", tableNamesAsJson);
            job.getConfiguration().set("phoenix.mapreduce.import.logicalnames", logicalNamesAsJson);
            this.setupJob(job);
            LOGGER.info("Running MapReduce import job from {} to {}", (Object)inputPaths, (Object)outputPath);
            boolean success = job.waitForCompletion(true);
            if (success) {
                if (hasLocalIndexes) {
                    try {
                        regionLocator = hbaseConn.getRegionLocator(TableName.valueOf((String)qualifiedTableName));
                        if (!IndexUtil.matchingSplitKeys((byte[][])splitKeysBeforeJob, (byte[][])regionLocator.getStartKeys())) {
                            LOGGER.error("The table " + qualifiedTableName + " has local indexes and there is split key mismatch before and after running bulkload job. Please rerun the job otherwise there may be inconsistencies between actual data and index data.");
                            int n = -1;
                            return n;
                        }
                    }
                    finally {
                        if (regionLocator != null) {
                            regionLocator.close();
                        }
                    }
                }
                LOGGER.info("Loading HFiles from {}", (Object)outputPath);
                this.completebulkload(conf, outputPath, tablesToBeLoaded);
                LOGGER.info("Removing output directory {}", (Object)outputPath);
                if (!outputPath.getFileSystem(conf).delete(outputPath, true)) {
                    LOGGER.error("Failed to delete the output directory {}", (Object)outputPath);
                }
                int n = 0;
                return n;
            }
            int n = -1;
            return n;
        }
    }

    private void completebulkload(Configuration conf, Path outputPath, List<TargetTableRef> tablesToBeLoaded) throws Exception {
        HashSet<String> tableNames = new HashSet<String>(tablesToBeLoaded.size());
        for (TargetTableRef table : tablesToBeLoaded) {
            if (tableNames.contains(table.getPhysicalName())) continue;
            tableNames.add(table.getPhysicalName());
            BulkLoadHFiles loader = BulkLoadHFiles.create((Configuration)conf);
            String tableName = table.getPhysicalName();
            Path tableOutputPath = CsvBulkImportUtil.getOutputPath(outputPath, tableName);
            LOGGER.info("Loading HFiles for {} from {}", (Object)tableName, (Object)tableOutputPath);
            loader.bulkLoad(TableName.valueOf((String)tableName), tableOutputPath);
            LOGGER.info("Incremental load complete for table=" + tableName);
        }
    }

    List<ColumnInfo> buildImportColumns(java.sql.Connection conn, CommandLine cmdLine, String qualifiedTableName) throws SQLException {
        ArrayList userSuppliedColumnNames = null;
        if (cmdLine.hasOption(IMPORT_COLUMNS_OPT.getOpt())) {
            userSuppliedColumnNames = Lists.newArrayList((Iterable)Splitter.on((String)",").trimResults().split((CharSequence)cmdLine.getOptionValue(IMPORT_COLUMNS_OPT.getOpt())));
        }
        return SchemaUtil.generateColumnInfo((java.sql.Connection)conn, (String)qualifiedTableName, userSuppliedColumnNames, (boolean)true);
    }

    private void validateTable(java.sql.Connection conn, String schemaName, String tableName) throws SQLException {
        ResultSet rs = conn.getMetaData().getColumns(null, StringUtil.escapeLike((String)schemaName), StringUtil.escapeLike((String)tableName), null);
        while (rs.next()) {
            String familyName = rs.getString("COLUMN_FAMILY");
            if (familyName == null || !familyName.startsWith("_")) continue;
            if ("0".equals(familyName)) {
                throw new IllegalStateException("Bulk Loader error: All column names that are not part of the primary key constraint must be prefixed with a column family name (i.e. f.my_column VARCHAR)");
            }
            throw new IllegalStateException("Bulk Loader error: Column family name must not start with '_': " + familyName);
        }
        rs.close();
    }

    private List<TargetTableRef> getIndexTables(java.sql.Connection conn, String qualifiedTableName) throws SQLException {
        PTable table = PhoenixRuntime.getTable((java.sql.Connection)conn, (String)qualifiedTableName);
        ArrayList<TargetTableRef> indexTables = new ArrayList<TargetTableRef>();
        for (PTable indexTable : table.getIndexes()) {
            indexTables.add(new TargetTableRef(indexTable.getName().getString(), indexTable.getPhysicalName().getString()));
        }
        return indexTables;
    }
}

