/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end.join;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessorclient.HashJoinCacheNotFoundException;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.join.BaseJoinIT;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={NeedsOwnMiniClusterTest.class})
public class HashJoinCacheIT
extends BaseJoinIT {
    @Override
    protected String getTableName(Connection conn, String virtualName) throws Exception {
        String realName = super.getTableName(conn, virtualName);
        TestUtil.addCoprocessor(conn, SchemaUtil.normalizeFullTableName((String)realName), InvalidateHashCache.class);
        return realName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=HashJoinCacheNotFoundException.class)
    public void testExpiredCache() throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        props.setProperty("phoenix.coprocessor.maxServerCacheTimeToLiveMs", "1");
        Statement statement = null;
        ResultSet rs = null;
        try (Connection conn = DriverManager.getConnection(HashJoinCacheIT.getUrl(), props);){
            String tableName1 = this.getTableName(conn, "\"Join\".\"SupplierTable\"");
            String tableName2 = this.getTableName(conn, "\"Join\".\"ItemTable\"");
            String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp RIGHT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
            statement = conn.prepareStatement(query);
            rs = statement.executeQuery();
            rs.next();
            Assert.fail((String)"HashJoinCacheNotFoundException was not thrown or incorrectly handled");
        }
        finally {
            if (statement != null) {
                statement.close();
            }
            if (rs != null) {
                rs.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=HashJoinCacheNotFoundException.class)
    public void testExpiredCacheWithLeftJoin() throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        props.setProperty("phoenix.coprocessor.maxServerCacheTimeToLiveMs", "1");
        Statement statement = null;
        ResultSet rs = null;
        try (Connection conn = DriverManager.getConnection(HashJoinCacheIT.getUrl(), props);){
            String tableName1 = this.getTableName(conn, "\"Join\".\"SupplierTable\"");
            String tableName2 = this.getTableName(conn, "\"Join\".\"ItemTable\"");
            String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp LEFT JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
            statement = conn.prepareStatement(query);
            rs = statement.executeQuery();
            rs.next();
            Assert.fail((String)"HashJoinCacheNotFoundException was not thrown");
        }
        finally {
            if (statement != null) {
                statement.close();
            }
            if (rs != null) {
                rs.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=HashJoinCacheNotFoundException.class)
    public void testExpiredCacheWithInnerJoin() throws Exception {
        Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
        props.setProperty("phoenix.coprocessor.maxServerCacheTimeToLiveMs", "1");
        Statement statement = null;
        ResultSet rs = null;
        try (Connection conn = DriverManager.getConnection(HashJoinCacheIT.getUrl(), props);){
            String tableName1 = this.getTableName(conn, "\"Join\".\"SupplierTable\"");
            String tableName2 = this.getTableName(conn, "\"Join\".\"ItemTable\"");
            String query = "SELECT item.\"item_id\", item.name, supp.\"supplier_id\", supp.name FROM " + tableName1 + " supp INNER JOIN " + tableName2 + " item ON item.\"supplier_id\" = supp.\"supplier_id\" ORDER BY \"item_id\"";
            statement = conn.prepareStatement(query);
            rs = statement.executeQuery();
            rs.next();
            Assert.fail((String)"HashJoinCacheNotFoundException was not thrown as expected");
        }
        finally {
            if (statement != null) {
                statement.close();
            }
            if (rs != null) {
                rs.close();
            }
        }
    }

    public static class InvalidateHashCache
    extends SimpleRegionObserver {
        public static Random rand = new Random();
        public static List<ImmutableBytesPtr> lastRemovedJoinIds = new ArrayList<ImmutableBytesPtr>();

        public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) {
            HashJoinInfo joinInfo = HashJoinInfo.deserializeHashJoinFromScan((Scan)scan);
            if (joinInfo != null) {
                TenantCache cache = GlobalCache.getTenantCache((RegionCoprocessorEnvironment)((RegionCoprocessorEnvironment)c.getEnvironment()), null);
                int count = joinInfo.getJoinIds().length;
                for (int i = 0; i < count; ++i) {
                    ImmutableBytesPtr joinId = joinInfo.getJoinIds()[i];
                    if (ByteUtil.contains(lastRemovedJoinIds, (ImmutableBytesPtr)joinId)) continue;
                    lastRemovedJoinIds.add(joinId);
                    cache.removeServerCache(joinId);
                }
            }
        }
    }
}

