/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.pig;

import java.sql.Array;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.phoenix.pig.BasePigIT;
import org.apache.phoenix.pig.PhoenixHBaseLoader;
import org.apache.phoenix.pig.PhoenixHBaseStorage;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixHBaseLoaderIT
extends BasePigIT {
    private static final Logger LOG = LoggerFactory.getLogger(PhoenixHBaseLoaderIT.class);
    private static final String SCHEMA_NAME = "T";
    private static final String TABLE_NAME = "A";
    private static final String INDEX_NAME = "I";
    private static final String TABLE_FULL_NAME = SchemaUtil.getTableName((String)"T", (String)"A");
    private static final String CASE_SENSITIVE_TABLE_NAME = SchemaUtil.getEscapedArgument((String)"a");
    private static final String CASE_SENSITIVE_TABLE_FULL_NAME = SchemaUtil.getTableName((String)"T", (String)CASE_SENSITIVE_TABLE_NAME);

    @Test
    public void testSchemaForTable() throws Exception {
        String TABLE = "TABLE1";
        String ddl = String.format("CREATE TABLE %s   (a_string varchar not null, a_binary varbinary not null, a_integer integer, cf1.a_float float  CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n", "TABLE1");
        this.conn.createStatement().execute(ddl);
        this.conn.commit();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE1", this.zkQuorum));
        Schema schema = this.pigServer.dumpSchema(TABLE_NAME);
        List fields = schema.getFields();
        Assert.assertEquals((long)4L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("a_string"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 55 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("a_binary"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 50 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)2)).alias.equalsIgnoreCase("a_integer"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)2)).type == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)3)).alias.equalsIgnoreCase("a_float"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)3)).type == 20 ? 1 : 0) != 0);
    }

    @Test
    public void testSchemaForTableWithSpecificColumns() throws Exception {
        String TABLE = "TABLE2";
        String ddl = "CREATE TABLE TABLE2  (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) ";
        this.conn.createStatement().execute("CREATE TABLE TABLE2  (ID INTEGER NOT NULL PRIMARY KEY,NAME VARCHAR, AGE INTEGER) ");
        String selectColumns = "ID,NAME";
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE2", "ID,NAME", this.zkQuorum));
        Schema schema = this.pigServer.dumpSchema(TABLE_NAME);
        List fields = schema.getFields();
        Assert.assertEquals((long)2L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("ID"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("NAME"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 55 ? 1 : 0) != 0);
    }

    @Test
    public void testSchemaForQuery() throws Exception {
        String TABLE = "TABLE3";
        String ddl = String.format("CREATE TABLE TABLE3  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL))\n", "TABLE3");
        this.conn.createStatement().execute(ddl);
        String sqlQuery = "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM TABLE3";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "SELECT A_STRING,CF1.A_INTEGER,CF2.A_DOUBLE FROM TABLE3", this.zkQuorum));
        Schema schema = this.pigServer.dumpSchema(TABLE_NAME);
        List fields = schema.getFields();
        Assert.assertEquals((long)3L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("a_string"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 55 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("a_integer"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)2)).alias.equalsIgnoreCase("a_double"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)2)).type == 25 ? 1 : 0) != 0);
    }

    @Test
    public void testSchemaForTableWithAlias() throws Exception {
        String TABLE = "S.TABLE4";
        String ddl = "CREATE TABLE  S.TABLE4  (A_STRING VARCHAR NOT NULL, A_DECIMAL DECIMAL NOT NULL, CF1.A_INTEGER INTEGER, CF2.A_DOUBLE DOUBLE  CONSTRAINT pk PRIMARY KEY (A_STRING, A_DECIMAL)) \n";
        this.conn.createStatement().execute(ddl);
        String sqlQuery = "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM S.TABLE4";
        LOG.info(String.format("Generated SQL Query [%s]", "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM S.TABLE4"));
        this.pigServer.registerQuery(String.format("raw = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s') AS (a:chararray,b:bigdecimal,c:int,d:double);", "SELECT A_STRING,A_DECIMAL,CF1.A_INTEGER,CF2.A_DOUBLE FROM S.TABLE4", this.zkQuorum));
        Schema schema = this.pigServer.dumpSchema("raw");
        List fields = schema.getFields();
        Assert.assertEquals((long)4L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("a"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 55 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("b"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 70 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)2)).alias.equalsIgnoreCase("c"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)2)).type == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)3)).alias.equalsIgnoreCase("d"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)3)).type == 25 ? 1 : 0) != 0);
    }

    @Test
    public void testDataForTable() throws Exception {
        String ddl = "CREATE TABLE  " + CASE_SENSITIVE_TABLE_FULL_NAME + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO " + CASE_SENSITIVE_TABLE_FULL_NAME + " VALUES(?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement(dml);
        int rows = 20;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            stmt.setInt(3, i % 2 == 0 ? 25 : 30);
            stmt.execute();
        }
        this.conn.commit();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", CASE_SENSITIVE_TABLE_FULL_NAME, this.zkQuorum));
        this.pigServer.registerQuery("B = FILTER A BY AGE > 25;");
        Iterator iterator = this.pigServer.openIterator("B");
        int recordsRead = 0;
        while (iterator.hasNext()) {
            Tuple each = (Tuple)iterator.next();
            Assert.assertEquals((long)3L, (long)each.size());
            ++recordsRead;
        }
        Assert.assertEquals((long)(rows / 2), (long)recordsRead);
    }

    @Test
    public void testDataForSQLQuery() throws Exception {
        String ddl = "CREATE TABLE  " + TABLE_FULL_NAME + "  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO " + TABLE_FULL_NAME + " VALUES(?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement(dml);
        int rows = 20;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            stmt.setInt(3, i % 2 == 0 ? 25 : 30);
            stmt.execute();
        }
        this.conn.commit();
        String sqlQuery = " SELECT ID,NAME,AGE FROM " + TABLE_FULL_NAME + " WHERE AGE > 25";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", sqlQuery, this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        int recordsRead = 0;
        while (iterator.hasNext()) {
            iterator.next();
            ++recordsRead;
        }
        Assert.assertEquals((long)(rows / 2), (long)recordsRead);
    }

    @Test
    public void testForNonPKSQLQuery() throws Exception {
        String TABLE = "TABLE5";
        String ddl = "CREATE TABLE  TABLE5 ( ID VARCHAR PRIMARY KEY, FOO VARCHAR, BAR INTEGER, BAZ UNSIGNED_INT)";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO TABLE5 VALUES(?,?,?,?) ";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE5 VALUES(?,?,?,?) ");
        stmt.setString(1, "a");
        stmt.setString(2, "a");
        stmt.setInt(3, -1);
        stmt.setInt(4, 1);
        stmt.execute();
        stmt.setString(1, "b");
        stmt.setString(2, "b");
        stmt.setInt(3, -2);
        stmt.setInt(4, 2);
        stmt.execute();
        this.conn.commit();
        String sqlQuery = String.format(" SELECT FOO, BAZ FROM %s WHERE BAR = -1 ", "TABLE5");
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        int recordsRead = 0;
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            Assert.assertEquals((Object)"a", (Object)tuple.get(0));
            Assert.assertEquals((Object)1, (Object)tuple.get(1));
            ++recordsRead;
        }
        Assert.assertEquals((long)1L, (long)recordsRead);
        Schema schema = this.pigServer.dumpSchema(TABLE_NAME);
        List fields = schema.getFields();
        Assert.assertEquals((long)2L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("FOO"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 55 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("BAZ"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 10 ? 1 : 0) != 0);
    }

    @Test
    public void testGroupingOfDataForTable() throws Exception {
        String TABLE = "TABLE6";
        String ddl = "CREATE TABLE  TABLE6  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO TABLE6 VALUES(?,?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE6 VALUES(?,?,?,?)");
        int rows = 20;
        int j = 0;
        int k = 0;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            if (i % 2 == 0) {
                stmt.setInt(3, 25);
                stmt.setInt(4, 20 * j++);
            } else {
                stmt.setInt(3, 30);
                stmt.setInt(4, 30 * k++);
            }
            stmt.execute();
        }
        this.conn.commit();
        Storage.Data data = Storage.resetData((PigServer)this.pigServer);
        ArrayList<Tuple> expectedList = new ArrayList<Tuple>();
        expectedList.add(Storage.tuple((Object[])new Object[]{0, 180}));
        expectedList.add(Storage.tuple((Object[])new Object[]{0, 270}));
        this.pigServer.setBatchOn();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE6", this.zkQuorum));
        this.pigServer.registerQuery("B = GROUP A BY AGE;");
        this.pigServer.registerQuery("C = FOREACH B GENERATE MIN(A.SAL),MAX(A.SAL);");
        this.pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
        this.pigServer.executeBatch();
        List actualList = data.get("out");
        Assert.assertEquals(expectedList, (Object)actualList);
    }

    @Test
    public void testTimestampForSQLQuery() throws Exception {
        String ddl = "CREATE TABLE TIMESTAMP_T (MYKEY VARCHAR,DATE_STP TIMESTAMP CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))";
        this.conn.createStatement().execute("UPSERT INTO TIMESTAMP_T VALUES('foo',TO_TIMESTAMP('2006-04-12 00:00:00'))");
        this.conn.commit();
        String sqlQuery = " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T ";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", " SELECT mykey, year(DATE_STP) FROM TIMESTAMP_T ", this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            Assert.assertEquals((Object)"foo", (Object)tuple.get(0));
            Assert.assertEquals((Object)2006, (Object)tuple.get(1));
        }
    }

    @Test
    public void testDateForSQLQuery() throws Exception {
        String ddl = "CREATE TABLE DATE_T (MYKEY VARCHAR,DATE_STP Date CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))";
        this.conn.createStatement().execute("UPSERT INTO DATE_T VALUES('foo',TO_DATE('2004-03-10 10:00:00'))");
        this.conn.commit();
        String sqlQuery = " SELECT mykey, hour(DATE_STP) FROM DATE_T ";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", " SELECT mykey, hour(DATE_STP) FROM DATE_T ", this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            Assert.assertEquals((Object)"foo", (Object)tuple.get(0));
            Assert.assertEquals((Object)10, (Object)tuple.get(1));
        }
    }

    @Test
    public void testTimeForSQLQuery() throws Exception {
        String ddl = "CREATE TABLE TIME_T (MYKEY VARCHAR,DATE_STP TIME CONSTRAINT PK PRIMARY KEY (MYKEY)) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))";
        this.conn.createStatement().execute("UPSERT INTO TIME_T VALUES('foo',TO_TIME('2008-05-16 00:30:00'))");
        this.conn.commit();
        String sqlQuery = " SELECT mykey, minute(DATE_STP) FROM TIME_T ";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using org.apache.phoenix.pig.PhoenixHBaseLoader('%s');", " SELECT mykey, minute(DATE_STP) FROM TIME_T ", this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            Assert.assertEquals((Object)"foo", (Object)tuple.get(0));
            Assert.assertEquals((Object)30, (Object)tuple.get(1));
        }
    }

    @Test
    public void testLoadAndStore() throws Exception {
        String TABLE = "TABLE7";
        String sourceTableddl = "CREATE TABLE  TABLE7  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ";
        String targetTable = "AGGREGATE";
        String targetTableddl = "CREATE TABLE AGGREGATE(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) ";
        this.conn.createStatement().execute("CREATE TABLE  TABLE7  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) ");
        this.conn.createStatement().execute("CREATE TABLE AGGREGATE(AGE INTEGER NOT NULL PRIMARY KEY , MIN_SAL INTEGER , MAX_SAL INTEGER) ");
        String dml = "UPSERT INTO TABLE7 VALUES(?,?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE7 VALUES(?,?,?,?)");
        int rows = 20;
        int j = 0;
        int k = 0;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            if (i % 2 == 0) {
                stmt.setInt(3, 25);
                stmt.setInt(4, 20 * j++);
            } else {
                stmt.setInt(3, 30);
                stmt.setInt(4, 30 * k++);
            }
            stmt.execute();
        }
        this.conn.commit();
        this.pigServer.setBatchOn();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE7", this.zkQuorum));
        this.pigServer.registerQuery("B = GROUP A BY AGE;");
        this.pigServer.registerQuery("C = FOREACH B GENERATE group as AGE,MIN(A.SAL),MAX(A.SAL);");
        this.pigServer.registerQuery("STORE C INTO 'hbase://AGGREGATE' using " + PhoenixHBaseStorage.class.getName() + "('" + this.zkQuorum + "', '-batchSize 1000');");
        this.pigServer.executeBatch();
        String selectQuery = "SELECT AGE , MIN_SAL ,MAX_SAL FROM AGGREGATE ORDER BY AGE";
        ResultSet rs = this.conn.createStatement().executeQuery("SELECT AGE , MIN_SAL ,MAX_SAL FROM AGGREGATE ORDER BY AGE");
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((long)25L, (long)rs.getInt("AGE"));
        Assert.assertEquals((long)0L, (long)rs.getInt("MIN_SAL"));
        Assert.assertEquals((long)180L, (long)rs.getInt("MAX_SAL"));
        Assert.assertTrue((boolean)rs.next());
        Assert.assertEquals((long)30L, (long)rs.getInt("AGE"));
        Assert.assertEquals((long)0L, (long)rs.getInt("MIN_SAL"));
        Assert.assertEquals((long)270L, (long)rs.getInt("MAX_SAL"));
    }

    @Test
    public void testDataForSQLQueryWithSequences() throws Exception {
        String TABLE = "TABLE8";
        String ddl = "CREATE TABLE TABLE8 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER) ";
        this.conn.createStatement().execute(ddl);
        String sequenceDdl = "CREATE SEQUENCE my_sequence";
        this.conn.createStatement().execute(sequenceDdl);
        String dml = "UPSERT INTO TABLE8 VALUES(?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE8 VALUES(?,?,?)");
        int rows = 20;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            stmt.setInt(3, i % 2 == 0 ? 25 : 30);
            stmt.execute();
        }
        this.conn.commit();
        String sqlQuery = " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM TABLE8 WHERE AGE > 25";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", " SELECT NEXT VALUE FOR my_sequence AS my_seq,ID,NAME,AGE FROM TABLE8 WHERE AGE > 25", this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        int recordsRead = 0;
        while (iterator.hasNext()) {
            iterator.next();
            ++recordsRead;
        }
        Assert.assertEquals((long)(rows / 2), (long)recordsRead);
    }

    @Test
    public void testDataForSQLQueryWithFunctions() throws Exception {
        String TABLE = "TABLE9";
        String ddl = "CREATE TABLE TABLE9 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR) ";
        this.conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO TABLE9 VALUES(?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE9 VALUES(?,?)");
        int rows = 20;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            stmt.execute();
        }
        this.conn.commit();
        String sqlQuery = " SELECT UPPER(NAME) AS n FROM TABLE9 ORDER BY ID";
        this.pigServer.registerQuery(String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", " SELECT UPPER(NAME) AS n FROM TABLE9 ORDER BY ID", this.zkQuorum));
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        int i = 0;
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            String name = (String)tuple.get(0);
            Assert.assertEquals((Object)(TABLE_NAME + i), (Object)name);
            ++i;
        }
    }

    @Test
    public void testDataFromIndexTable() throws Exception {
        String ddl = "CREATE TABLE A (ID INTEGER NOT NULL, NAME VARCHAR NOT NULL, EMPLID INTEGER CONSTRAINT pk PRIMARY KEY (ID, NAME)) IMMUTABLE_ROWS=true";
        this.conn.createStatement().execute(ddl);
        String indexDdl = " CREATE INDEX I ON A (EMPLID) INCLUDE (NAME) ";
        this.conn.createStatement().execute(indexDdl);
        String dml = "UPSERT INTO A VALUES(?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO A VALUES(?,?,?)");
        int rows = 20;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            stmt.setInt(3, i * 5);
            stmt.execute();
        }
        this.conn.commit();
        this.pigServer.registerQuery("A = load 'hbase://query/SELECT NAME , EMPLID FROM A WHERE EMPLID = 25 ' using " + PhoenixHBaseLoader.class.getName() + "('" + this.zkQuorum + "')  ;");
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            Assert.assertEquals((Object)"a5", (Object)tuple.get(0));
            Assert.assertEquals((Object)25, (Object)tuple.get(1));
        }
    }

    @Test
    public void testLoadOfSaltTable() throws Exception {
        String TABLE = "TABLE11";
        String sourceTableddl = "CREATE TABLE  TABLE11  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2  ";
        this.conn.createStatement().execute("CREATE TABLE  TABLE11  (ID  INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, AGE INTEGER, SAL INTEGER) SALT_BUCKETS=2  ");
        String dml = "UPSERT INTO TABLE11 VALUES(?,?,?,?)";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE11 VALUES(?,?,?,?)");
        int rows = 20;
        int j = 0;
        int k = 0;
        for (int i = 0; i < rows; ++i) {
            stmt.setInt(1, i);
            stmt.setString(2, "a" + i);
            if (i % 2 == 0) {
                stmt.setInt(3, 25);
                stmt.setInt(4, 20 * j++);
            } else {
                stmt.setInt(3, 30);
                stmt.setInt(4, 30 * k++);
            }
            stmt.execute();
        }
        this.conn.commit();
        Storage.Data data = Storage.resetData((PigServer)this.pigServer);
        ArrayList<Tuple> expectedList = new ArrayList<Tuple>();
        expectedList.add(Storage.tuple((Object[])new Object[]{25, 10}));
        expectedList.add(Storage.tuple((Object[])new Object[]{30, 10}));
        this.pigServer.setBatchOn();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE11", this.zkQuorum));
        this.pigServer.registerQuery("B = GROUP A BY AGE;");
        this.pigServer.registerQuery("C = FOREACH B GENERATE group,COUNT(A);");
        this.pigServer.registerQuery("STORE C INTO 'out' using mock.Storage();");
        this.pigServer.executeBatch();
        List actualList = data.get("out");
        Assert.assertEquals((long)expectedList.size(), (long)actualList.size());
    }

    @Test
    public void testLoadForArrayWithQuery() throws Exception {
        String TABLE = "TABLE14";
        String ddl = "CREATE TABLE  TABLE14 ( ID INTEGER PRIMARY KEY, a_double_array double array[] , a_varchar_array varchar array, a_concat_str varchar, sep varchar)";
        this.conn.createStatement().execute(ddl);
        Object[] doubleArr = new Double[]{2.2, 4.4, 6.6};
        Array doubleArray = this.conn.createArrayOf("DOUBLE", doubleArr);
        Tuple doubleArrTuple = Storage.tuple((Object[])new Object[]{2.2, 4.4, 6.6});
        Object[] doubleArr2 = new Double[]{12.2, 22.2};
        Array doubleArray2 = this.conn.createArrayOf("DOUBLE", doubleArr2);
        Tuple doubleArrTuple2 = Storage.tuple((Object[])new Object[]{12.2, 22.2});
        Object[] strArr = new String[]{"ABC", "DEF", "GHI", "JKL"};
        Array strArray = this.conn.createArrayOf("VARCHAR", strArr);
        Tuple strArrTuple = Storage.tuple((Object[])new Object[]{"ABC", "DEF", "GHI", "JKL"});
        Object[] strArr2 = new String[]{"ABC", "XYZ"};
        Array strArray2 = this.conn.createArrayOf("VARCHAR", strArr2);
        Tuple strArrTuple2 = Storage.tuple((Object[])new Object[]{"ABC", "XYZ"});
        String dml = "UPSERT INTO TABLE14 VALUES(?, ?, ?, ?, ?) ";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE14 VALUES(?, ?, ?, ?, ?) ");
        stmt.setInt(1, 1);
        stmt.setArray(2, doubleArray);
        stmt.setArray(3, strArray);
        stmt.setString(4, "ONE,TWO,THREE");
        stmt.setString(5, ",");
        stmt.execute();
        stmt.setInt(1, 2);
        stmt.setArray(2, doubleArray2);
        stmt.setArray(3, strArray2);
        stmt.setString(4, "FOUR:five:six");
        stmt.setString(5, ":");
        stmt.execute();
        this.conn.commit();
        Tuple dynArrTuple = Storage.tuple((Object[])new Object[]{"ONE", "TWO", "THREE"});
        Tuple dynArrTuple2 = Storage.tuple((Object[])new Object[]{"FOUR", "five", "six"});
        String sqlQuery = String.format(" SELECT ID, A_DOUBLE_ARRAY, A_VARCHAR_ARRAY, REGEXP_SPLIT(a_concat_str, sep) AS flattend_str FROM %s ", "TABLE14");
        Storage.Data data = Storage.resetData((PigServer)this.pigServer);
        ArrayList<Tuple> expectedList = new ArrayList<Tuple>();
        expectedList.add(Storage.tuple((Object[])new Object[]{1, 3L, 4L, dynArrTuple}));
        expectedList.add(Storage.tuple((Object[])new Object[]{2, 2L, 2L, dynArrTuple2}));
        String load = String.format("A = load 'hbase://query/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", sqlQuery, this.zkQuorum);
        this.pigServer.setBatchOn();
        this.pigServer.registerQuery(load);
        this.pigServer.registerQuery("B = FOREACH A GENERATE ID, SIZE(A_DOUBLE_ARRAY), SIZE(A_VARCHAR_ARRAY), FLATTEND_STR;");
        this.pigServer.registerQuery("STORE B INTO 'out' using mock.Storage();");
        this.pigServer.executeBatch();
        List actualList = data.get("out");
        Assert.assertEquals((long)expectedList.size(), (long)actualList.size());
        Assert.assertEquals(expectedList, (Object)actualList);
        Schema schema = this.pigServer.dumpSchema(TABLE_NAME);
        List fields = schema.getFields();
        Assert.assertEquals((long)4L, (long)fields.size());
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)0)).alias.equalsIgnoreCase("ID"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)0)).type == 10 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)1)).alias.equalsIgnoreCase("A_DOUBLE_ARRAY"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)1)).type == 110 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)2)).alias.equalsIgnoreCase("A_VARCHAR_ARRAY"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)2)).type == 110 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)((Schema.FieldSchema)fields.get((int)3)).alias.equalsIgnoreCase("FLATTEND_STR"));
        Assert.assertTrue((((Schema.FieldSchema)fields.get((int)3)).type == 110 ? 1 : 0) != 0);
        Iterator iterator = this.pigServer.openIterator(TABLE_NAME);
        Tuple firstTuple = Storage.tuple((Object[])new Object[]{1, doubleArrTuple, strArrTuple, dynArrTuple});
        Tuple secondTuple = Storage.tuple((Object[])new Object[]{2, doubleArrTuple2, strArrTuple2, dynArrTuple2});
        ArrayList expectedRows = Lists.newArrayList((Object[])new Tuple[]{firstTuple, secondTuple});
        ArrayList actualRows = Lists.newArrayList();
        while (iterator.hasNext()) {
            Tuple tuple = (Tuple)iterator.next();
            actualRows.add(tuple);
        }
        Assert.assertEquals((Object)expectedRows, (Object)actualRows);
    }

    @Test
    public void testLoadForArrayWithTable() throws Exception {
        String TABLE = "TABLE15";
        String ddl = "CREATE TABLE  TABLE15 ( ID INTEGER PRIMARY KEY, a_double_array double array[])";
        this.conn.createStatement().execute(ddl);
        Object[] doubleArr = new Double[]{2.2, 4.4, 6.6};
        Array doubleArray = this.conn.createArrayOf("DOUBLE", doubleArr);
        Tuple doubleArrTuple = Storage.tuple((Object[])new Object[]{2.2, 4.4, 6.6});
        Object[] doubleArr2 = new Double[]{12.2, 22.2};
        Array doubleArray2 = this.conn.createArrayOf("DOUBLE", doubleArr2);
        Tuple doubleArrTuple2 = Storage.tuple((Object[])new Object[]{12.2, 22.2});
        String dml = "UPSERT INTO TABLE15 VALUES(?, ?) ";
        PreparedStatement stmt = this.conn.prepareStatement("UPSERT INTO TABLE15 VALUES(?, ?) ");
        stmt.setInt(1, 1);
        stmt.setArray(2, doubleArray);
        stmt.execute();
        stmt.setInt(1, 2);
        stmt.setArray(2, doubleArray2);
        stmt.execute();
        this.conn.commit();
        Storage.Data data = Storage.resetData((PigServer)this.pigServer);
        ArrayList<Tuple> expectedList = new ArrayList<Tuple>();
        expectedList.add(Storage.tuple((Object[])new Object[]{1, doubleArrTuple}));
        expectedList.add(Storage.tuple((Object[])new Object[]{2, doubleArrTuple2}));
        this.pigServer.setBatchOn();
        this.pigServer.registerQuery(String.format("A = load 'hbase://table/%s' using " + PhoenixHBaseLoader.class.getName() + "('%s');", "TABLE15", this.zkQuorum));
        this.pigServer.registerQuery("STORE A INTO 'out' using mock.Storage();");
        this.pigServer.executeBatch();
        List actualList = data.get("out");
        Assert.assertEquals((long)expectedList.size(), (long)actualList.size());
        Assert.assertEquals(expectedList, (Object)actualList);
    }
}

