/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.iterate;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.iterate.MockParallelIteratorFactory;
import org.apache.phoenix.iterate.ParallelIteratorFactory;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.iterate.RoundRobinResultIterator;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.jdbc.PhoenixStatement;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.thirdparty.com.google.common.collect.Sets;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={ParallelStatsDisabledTest.class})
public class RoundRobinResultIteratorIT
extends ParallelStatsDisabledIT {
    private static final int NUM_SALT_BUCKETS = 4;

    private static Connection getConnection() throws SQLException {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        props.setProperty("phoenix.query.force.rowkeyorder", Boolean.toString(false));
        Connection conn = DriverManager.getConnection(RoundRobinResultIteratorIT.getUrl(), props);
        return conn;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRoundRobinAfterTableSplit() throws Exception {
        int nRegions;
        String tableName = RoundRobinResultIteratorIT.generateUniqueName();
        byte[] tableNameBytes = Bytes.toBytes((String)tableName);
        int numRows = RoundRobinResultIteratorIT.setupTableForSplit(tableName);
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
        int queryTimeout = services.getProps().getInt("phoenix.query.timeoutMs", 600000);
        int nRegionsBeforeSplit = nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size();
        try (Admin admin = services.getAdmin();){
            admin.split(TableName.valueOf((String)tableName));
            CountDownLatch latch = new CountDownLatch(1);
            long waitTimeMillis = 2000L;
            for (int nTries = 0; nRegions == nRegionsBeforeSplit && nTries < 10; ++nTries) {
                latch.await(waitTimeMillis, TimeUnit.MILLISECONDS);
                nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size();
            }
            String query = "SELECT * FROM " + tableName;
            Statement stmt = conn.createStatement();
            stmt.setFetchSize(10);
            ResultSet rs = stmt.executeQuery(query);
            int numRowsRead = 0;
            while (rs.next()) {
                ++numRowsRead;
            }
            nRegions = services.getAllTableRegions(tableNameBytes, queryTimeout).size();
            Assert.assertNotEquals((long)nRegions, (long)nRegionsBeforeSplit);
            Assert.assertEquals((long)numRows, (long)numRowsRead);
        }
    }

    @Test
    public void testSelectAllRowsWithDifferentFetchSizes_salted() throws Exception {
        this.testSelectAllRowsWithDifferentFetchSizes(true);
    }

    @Test
    public void testSelectAllRowsWithDifferentFetchSizes_unsalted() throws Exception {
        this.testSelectAllRowsWithDifferentFetchSizes(false);
    }

    private void testSelectAllRowsWithDifferentFetchSizes(boolean salted) throws Exception {
        String tableName = RoundRobinResultIteratorIT.generateUniqueName();
        int numRows = 9;
        Set<String> expectedKeys = Collections.unmodifiableSet(this.createTableAndInsertRows(tableName, numRows, salted, false));
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), 2, stmt, salted ? 2 : 5);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), numRows - 1, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), numRows, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), numRows + 1, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(expectedKeys), numRows + 2, stmt, 0);
    }

    @Test
    public void testSelectRowsWithFilterAndDifferentFetchSizes_unsalted() throws Exception {
        this.testSelectRowsWithFilterAndDifferentFetchSizes(false);
    }

    @Test
    public void testSelectRowsWithFilterAndDifferentFetchSizes_salted() throws Exception {
        this.testSelectRowsWithFilterAndDifferentFetchSizes(true);
    }

    private void testSelectRowsWithFilterAndDifferentFetchSizes(boolean salted) throws Exception {
        String tableName = RoundRobinResultIteratorIT.generateUniqueName();
        int numRows = 6;
        Set<String> insertedKeys = this.createTableAndInsertRows(tableName, numRows, salted, false);
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        PreparedStatement stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K = ?");
        stmt.setString(1, "key1");
        int numRowsFiltered = 1;
        RoundRobinResultIteratorIT.tryWithFetchSize(Sets.newHashSet((Object[])new String[]{"key1"}), 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(Sets.newHashSet((Object[])new String[]{"key1"}), 2, stmt, salted ? 1 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(Sets.newHashSet((Object[])new String[]{"key1"}), 3, stmt, 0);
        stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
        stmt.setString(1, "key2");
        insertedKeys.remove("key1");
        insertedKeys.remove("key2");
        numRowsFiltered = 4;
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 2, stmt, salted ? 1 : 2);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRowsFiltered - 1, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRowsFiltered, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRowsFiltered + 1, stmt, salted ? 0 : 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRowsFiltered + 2, stmt, 0);
        stmt = conn.prepareStatement("SELECT K, V FROM " + tableName + " WHERE K > ?");
        stmt.setString(1, "key6");
        insertedKeys.clear();
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 2, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRows - 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRows, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), numRows + 1, stmt, 0);
    }

    private Set<String> createTableAndInsertRows(String tableName, int numRows, boolean salted, boolean addTableNameToKey) throws Exception {
        String ddl = "CREATE TABLE " + tableName + " (K VARCHAR NOT NULL PRIMARY KEY, V VARCHAR)" + (salted ? "SALT_BUCKETS=4" : "");
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        conn.createStatement().execute(ddl);
        String dml = "UPSERT INTO " + tableName + " VALUES (?, ?)";
        PreparedStatement stmt = conn.prepareStatement(dml);
        HashSet<String> expectedKeys = new HashSet<String>(numRows);
        for (int i = 1; i <= numRows; ++i) {
            String key = (addTableNameToKey ? tableName : "") + "key" + i;
            expectedKeys.add(key);
            stmt.setString(1, key);
            stmt.setString(2, "value" + i);
            stmt.executeUpdate();
        }
        conn.commit();
        return expectedKeys;
    }

    @Test
    public void testFetchSizesAndRVCExpression() throws Exception {
        String tableName = RoundRobinResultIteratorIT.generateUniqueName();
        Set<String> insertedKeys = Collections.unmodifiableSet(this.createTableAndInsertRows(tableName, 4, false, false));
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        PreparedStatement stmt = conn.prepareStatement("SELECT K FROM " + tableName + " WHERE (K, V)  > (?, ?)");
        stmt.setString(1, "key0");
        stmt.setString(2, "value0");
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 1, stmt, 0);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 2, stmt, 2);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 3, stmt, 1);
        RoundRobinResultIteratorIT.tryWithFetchSize(new HashSet<String>(insertedKeys), 4, stmt, 1);
    }

    private static void tryWithFetchSize(Set<String> expectedKeys, int fetchSize, PreparedStatement stmt, int numFetches) throws Exception {
        stmt.setFetchSize(fetchSize);
        ResultSet rs = stmt.executeQuery();
        int expectedNumRows = expectedKeys.size();
        int numRows = 0;
        while (rs.next()) {
            expectedKeys.remove(rs.getString(1));
            ++numRows;
        }
        Assert.assertEquals((String)"Number of rows didn't match", (long)expectedNumRows, (long)numRows);
        Assert.assertTrue((String)("Not all rows were returned for fetch size: " + fetchSize + " - " + expectedKeys), (expectedKeys.size() == 0 ? 1 : 0) != 0);
        RoundRobinResultIteratorIT.assertRoundRobinBehavior(rs, stmt, numFetches);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int setupTableForSplit(String tableName) throws Exception {
        int batchSize = 25;
        int maxFileSize = 10240;
        int payLoadSize = 1024;
        StringBuilder buf = new StringBuilder();
        for (int i = 0; i < payLoadSize; ++i) {
            buf.append('a');
        }
        String payload = buf.toString();
        int MIN_CHAR = 97;
        int MAX_CHAR = 122;
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        conn.createStatement().execute("CREATE TABLE " + tableName + "(a VARCHAR PRIMARY KEY, b VARCHAR) " + "MAX_FILESIZE" + "=" + maxFileSize + ", SALT_BUCKETS = " + 4);
        PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?)");
        int rowCount = 0;
        for (int c1 = MIN_CHAR; c1 <= MAX_CHAR; ++c1) {
            for (int c2 = MIN_CHAR; c2 <= MAX_CHAR; ++c2) {
                String pk = Character.toString((char)c1) + Character.toString((char)c2);
                stmt.setString(1, pk);
                stmt.setString(2, payload);
                stmt.execute();
                if (++rowCount % batchSize != 0) continue;
                conn.commit();
            }
        }
        conn.commit();
        ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices();
        try (Admin admin = services.getAdmin();){
            admin.flush(TableName.valueOf((String)tableName));
        }
        conn.close();
        return rowCount;
    }

    @Test
    public void testUnionAllSelects() throws Exception {
        int insertedRowsA = 10;
        int insertedRowsB = 5;
        int insertedRowsC = 7;
        String baseTableName = RoundRobinResultIteratorIT.generateUniqueName();
        String tableA = "TABLEA" + baseTableName;
        String tableB = "TABLEB" + baseTableName;
        String tableC = "TABLEC" + baseTableName;
        Set<String> keySetA = this.createTableAndInsertRows(tableA, insertedRowsA, true, true);
        Set<String> keySetB = this.createTableAndInsertRows(tableB, insertedRowsB, true, true);
        Set<String> keySetC = this.createTableAndInsertRows(tableC, insertedRowsC, false, true);
        String query = "SELECT K FROM " + tableA + " UNION ALL SELECT K FROM " + tableB + " UNION ALL SELECT K FROM " + tableC;
        Connection conn = RoundRobinResultIteratorIT.getConnection();
        PreparedStatement stmt = conn.prepareStatement(query);
        stmt.setFetchSize(2);
        ResultSet rs = stmt.executeQuery();
        int rowsA = 0;
        int rowsB = 0;
        int rowsC = 0;
        while (rs.next()) {
            String key = rs.getString(1);
            if (key.startsWith("TABLEA")) {
                ++rowsA;
            } else if (key.startsWith("TABLEB")) {
                ++rowsB;
            } else if (key.startsWith("TABLEC")) {
                ++rowsC;
            }
            keySetA.remove(key);
            keySetB.remove(key);
            keySetC.remove(key);
        }
        Assert.assertEquals((String)"Not all rows of tableA were returned", (long)0L, (long)keySetA.size());
        Assert.assertEquals((String)"Not all rows of tableB were returned", (long)0L, (long)keySetB.size());
        Assert.assertEquals((String)"Not all rows of tableC were returned", (long)0L, (long)keySetC.size());
        Assert.assertEquals((String)"Number of rows retrieved didn't match for tableA", (long)insertedRowsA, (long)rowsA);
        Assert.assertEquals((String)"Number of rows retrieved didnt match for tableB", (long)insertedRowsB, (long)rowsB);
        Assert.assertEquals((String)"Number of rows retrieved didn't match for tableC", (long)insertedRowsC, (long)rowsC);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBug2074() throws Exception {
        try (Connection conn = RoundRobinResultIteratorIT.getConnection();){
            conn.createStatement().execute("CREATE TABLE EVENTS   (id VARCHAR(10) PRIMARY KEY,     article VARCHAR(10),     misc VARCHAR(10))");
            PreparedStatement upsertStmt = conn.prepareStatement("upsert into EVENTS(id, article, misc) values (?, ?, ?)");
            upsertStmt.setString(1, "001");
            upsertStmt.setString(2, "A");
            upsertStmt.setString(3, "W");
            upsertStmt.execute();
            upsertStmt.setString(1, "002");
            upsertStmt.setString(2, "B");
            upsertStmt.setString(3, "X");
            upsertStmt.execute();
            upsertStmt.setString(1, "003");
            upsertStmt.setString(2, "C");
            upsertStmt.setString(3, "Y");
            upsertStmt.execute();
            upsertStmt.setString(1, "004");
            upsertStmt.setString(2, "D");
            upsertStmt.setString(3, "Z");
            upsertStmt.execute();
            conn.commit();
            conn.createStatement().execute("CREATE TABLE MAPPING   (id VARCHAR(10) PRIMARY KEY,     article VARCHAR(10),     category VARCHAR(10))");
            upsertStmt = conn.prepareStatement("upsert into MAPPING(id, article, category) values (?, ?, ?)");
            upsertStmt.setString(1, "002");
            upsertStmt.setString(2, "A");
            upsertStmt.setString(3, "X");
            upsertStmt.execute();
            upsertStmt.setString(1, "003");
            upsertStmt.setString(2, "B");
            upsertStmt.setString(3, "Y");
            upsertStmt.execute();
            upsertStmt.setString(1, "004");
            upsertStmt.setString(2, "C");
            upsertStmt.setString(3, "Z");
            upsertStmt.execute();
            upsertStmt.setString(1, "005");
            upsertStmt.setString(2, "E");
            upsertStmt.setString(3, "Z");
            upsertStmt.execute();
            upsertStmt.setString(1, "006");
            upsertStmt.setString(2, "C");
            upsertStmt.setString(3, "Z");
            upsertStmt.execute();
            upsertStmt.setString(1, "007");
            upsertStmt.setString(2, "C");
            upsertStmt.setString(3, "Z");
            upsertStmt.execute();
            conn.commit();
            String query = "select count(MAPPING.article) as cnt,MAPPING.category from EVENTS join MAPPING on MAPPING.article = EVENTS.article group by category order by cnt";
            PreparedStatement statement = conn.prepareStatement(query);
            statement.setFetchSize(3);
            ResultSet rs = statement.executeQuery();
            while (rs.next()) {
            }
        }
    }

    private static ResultIterator getResultIterator(ResultSet rs) throws SQLException {
        return rs.unwrap(PhoenixResultSet.class).getUnderlyingIterator();
    }

    private static void assertRoundRobinBehavior(ResultSet rs, Statement stmt, int numFetches) throws SQLException {
        ResultIterator itr = RoundRobinResultIteratorIT.getResultIterator(rs);
        if (stmt.getFetchSize() > 1) {
            Assert.assertTrue((boolean)(itr instanceof RoundRobinResultIterator));
            RoundRobinResultIterator roundRobinItr = (RoundRobinResultIterator)itr;
            Assert.assertEquals((long)numFetches, (long)roundRobinItr.getNumberOfParallelFetches());
        }
    }

    @Test
    public void testIteratorsPickedInRoundRobinFashionForSaltedTable() throws Exception {
        try (Connection conn = RoundRobinResultIteratorIT.getConnection();){
            int i;
            String testTable = "testIteratorsPickedInRoundRobinFashionForSaltedTable".toUpperCase();
            Statement stmt = conn.createStatement();
            stmt.execute("CREATE TABLE " + testTable + "(K VARCHAR PRIMARY KEY) SALT_BUCKETS = 8");
            PhoenixConnection phxConn = conn.unwrap(PhoenixConnection.class);
            MockParallelIteratorFactory parallelIteratorFactory = new MockParallelIteratorFactory();
            phxConn.setIteratorFactory((ParallelIteratorFactory)parallelIteratorFactory);
            ResultSet rs = stmt.executeQuery("SELECT * FROM " + testTable);
            StatementContext ctx = rs.unwrap(PhoenixResultSet.class).getContext();
            PTable table = ((TableRef)ctx.getResolver().getTables().get(0)).getTable();
            parallelIteratorFactory.setTable(table);
            PhoenixStatement pstmt = stmt.unwrap(PhoenixStatement.class);
            int numIterators = pstmt.getQueryPlan().getSplits().size();
            Assert.assertEquals((long)8L, (long)numIterators);
            int numFetches = 2 * numIterators;
            ArrayList<String> iteratorOrder = new ArrayList<String>(numFetches);
            for (i = 1; i <= numFetches; ++i) {
                rs.next();
                iteratorOrder.add(rs.getString(1));
            }
            for (i = 0; i < numIterators; ++i) {
                Assert.assertEquals(iteratorOrder.get(i), iteratorOrder.get(i + numIterators));
            }
        }
    }
}

