/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.end2end.ParallelStatsDisabledTest;
import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={ParallelStatsDisabledTest.class})
public class MapReduceIT
extends ParallelStatsDisabledIT {
    private static final String STOCK_NAME = "STOCK_NAME";
    private static final String RECORDING_YEAR = "RECORDING_YEAR";
    private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
    private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s (  STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER  DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) SPLIT ON ('AA')";
    private static final String CREATE_STOCK_VIEW = "CREATE VIEW IF NOT EXISTS %s (v1 VARCHAR) AS  SELECT * FROM %s WHERE RECORDING_YEAR = 2008";
    private static final String MAX_RECORDING = "MAX_RECORDING";
    private static final String CREATE_STOCK_STATS_TABLE = "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL ,  MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
    private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
    private static final String TENANT_ID = "1234567890";

    @Before
    public void setupTables() throws Exception {
    }

    @After
    public void clearCountersForScanGrouper() throws Exception {
        boolean refCountLeaked = MapReduceIT.isAnyStoreRefCountLeaked();
        TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
        Assert.assertFalse((String)"refCount leaked", (boolean)refCountLeaked);
    }

    @Test
    public void testNoConditionsOnSelect() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createAndTestJob(conn, null, 91.04, null);
        }
    }

    @Test
    public void testConditionsOnSelect() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createAndTestJob(conn, "RECORDING_YEAR  < 2009", 81.04, null);
        }
    }

    @Test
    public void testMapReduceWithVerySmallPhoenixQueryTimeout() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createPagedJobAndTestFailedJobDueToTimeOut(conn, "RECORDING_YEAR % 2 = 0", 82.89, null, true);
        }
    }

    @Test
    public void testMapReduceWithVerySmallPhoenixQueryTimeoutWithTenantId() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createPagedJobAndTestFailedJobDueToTimeOut(conn, "RECORDING_YEAR % 2 = 0", 82.89, TENANT_ID, true);
        }
    }

    @Test
    public void testMapReduceWithNormalPhoenixQueryTimeout() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createPagedJobAndTestFailedJobDueToTimeOut(conn, "RECORDING_YEAR % 2 = 0", 82.89, null, false);
        }
    }

    @Test
    public void testMapReduceWithNormalPhoenixQueryTimeoutWithTenantId() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createPagedJobAndTestFailedJobDueToTimeOut(conn, "RECORDING_YEAR % 2 = 0", 81.04, TENANT_ID, false);
        }
    }

    @Test
    public void testWithTenantId() throws Exception {
        try (Connection conn = DriverManager.getConnection(MapReduceIT.getUrl());){
            this.createAndTestJob(conn, null, 81.04, TENANT_ID);
        }
    }

    private void createAndTestJob(Connection conn, String whereCondition, double maxExpected, String tenantId) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
        String stockTableName = MapReduceIT.generateUniqueName();
        String stockStatsTableName = MapReduceIT.generateUniqueName();
        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
        conn.commit();
        Configuration conf = MapReduceIT.getUtility().getConfiguration();
        Job job = Job.getInstance((Configuration)conf);
        if (tenantId != null) {
            this.setInputForTenant(job, tenantId, stockTableName, whereCondition);
        } else {
            PhoenixMapReduceUtil.setInput((Job)job, StockWritable.class, PhoenixTestingInputFormat.class, (String)stockTableName, (String)whereCondition, (String[])new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        }
        this.testJob(conn, job, stockTableName, stockStatsTableName, maxExpected);
    }

    private void createPagedJobAndTestFailedJobDueToTimeOut(Connection conn, String whereCondition, double maxExpected, String tenantId, boolean testVerySmallTimeOut) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
        String stockTableName = MapReduceIT.generateUniqueName();
        String stockStatsTableName = MapReduceIT.generateUniqueName();
        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
        conn.commit();
        Configuration conf = new Configuration(MapReduceIT.getUtility().getConfiguration());
        if (testVerySmallTimeOut) {
            conf.set("phoenix.server.page.size.ms", Integer.toString(0));
            conf.set("phoenix.query.timeoutMs", Integer.toString(1));
        } else {
            conf.set("phoenix.server.page.size.ms", Integer.toString(0));
            conf.set("phoenix.query.timeoutMs", Integer.toString(600000));
        }
        Job job = Job.getInstance((Configuration)conf);
        if (tenantId != null) {
            this.setInputForTenant(job, tenantId, stockTableName, whereCondition);
        } else {
            PhoenixMapReduceUtil.setInput((Job)job, StockWritable.class, PhoenixTestingInputFormat.class, (String)stockTableName, (String)whereCondition, (String[])new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        }
        Assert.assertEquals((String)"Failed to reset getRegionBoundaries counter for scanGrouper", (long)0L, (long)TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        this.upsertData(conn, stockTableName);
        job.getConfiguration().set("mapreduce.framework.name", "local");
        this.setOutput(job, stockStatsTableName);
        job.setMapperClass(StockMapper.class);
        job.setReducerClass(StockReducer.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(StockWritable.class);
        if (testVerySmallTimeOut) {
            try {
                Assert.assertFalse((String)"Job should fail with QueryTimeout.", (boolean)job.waitForCompletion(true));
            }
            catch (RuntimeException e) {
                Assert.assertTrue((String)"Job execution failed with unexpected error.", (boolean)(e.getCause() instanceof SQLTimeoutException));
            }
        } else {
            Assert.assertTrue((String)"Job didn't complete successfully! Check logs for reason.", (boolean)job.waitForCompletion(true));
            ResultSet stats = DriverManager.getConnection(MapReduceIT.getUrl()).createStatement().executeQuery("SELECT * FROM " + stockStatsTableName);
            Assert.assertTrue((String)"No data stored in stats table!", (boolean)stats.next());
            String name = stats.getString(1);
            double max = stats.getDouble(2);
            Assert.assertEquals((String)"Got the wrong stock name!", (Object)"AAPL", (Object)name);
            Assert.assertEquals((String)"Max value didn't match the expected!", (double)maxExpected, (double)max, (double)0.0);
            Assert.assertFalse((String)"Should only have stored one row in stats table!", (boolean)stats.next());
            Assert.assertEquals((String)"There should have been only be 1 call to getRegionBoundaries (corresponding to the driver code)", (long)1L, (long)TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        }
    }

    private void setInputForTenant(Job job, String tenantId, String stockTableName, String s) throws SQLException {
        Properties props = new Properties();
        props.setProperty("TenantId", TENANT_ID);
        try (Connection tenantConn = DriverManager.getConnection(MapReduceIT.getUrl(), props);){
            PhoenixMapReduceUtil.setTenantId((Job)job, (String)tenantId);
            String stockViewName = MapReduceIT.generateUniqueName();
            tenantConn.createStatement().execute(String.format(CREATE_STOCK_VIEW, stockViewName, stockTableName));
            tenantConn.commit();
            PhoenixMapReduceUtil.setInput((Job)job, StockWritable.class, PhoenixTestingInputFormat.class, (String)stockViewName, (String)s, (String[])new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        }
    }

    private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName, double expectedMax) throws SQLException, InterruptedException, IOException, ClassNotFoundException {
        Assert.assertEquals((String)"Failed to reset getRegionBoundaries counter for scanGrouper", (long)0L, (long)TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        this.upsertData(conn, stockTableName);
        job.getConfiguration().set("mapreduce.framework.name", "local");
        this.setOutput(job, stockStatsTableName);
        job.setMapperClass(StockMapper.class);
        job.setReducerClass(StockReducer.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(StockWritable.class);
        Assert.assertTrue((String)"Job didn't complete successfully! Check logs for reason.", (boolean)job.waitForCompletion(true));
        ResultSet stats = DriverManager.getConnection(MapReduceIT.getUrl()).createStatement().executeQuery("SELECT * FROM " + stockStatsTableName);
        Assert.assertTrue((String)"No data stored in stats table!", (boolean)stats.next());
        String name = stats.getString(1);
        double max = stats.getDouble(2);
        Assert.assertEquals((String)"Got the wrong stock name!", (Object)"AAPL", (Object)name);
        Assert.assertEquals((String)"Max value didn't match the expected!", (double)expectedMax, (double)max, (double)0.0);
        Assert.assertFalse((String)"Should only have stored one row in stats table!", (boolean)stats.next());
        Assert.assertEquals((String)"There should have been only be 1 call to getRegionBoundaries (corresponding to the driver code)", (long)1L, (long)TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
    }

    private void setOutput(Job job, String stockStatsTableName) {
        Configuration configuration = job.getConfiguration();
        PhoenixConfigurationUtil.setOutputTableName((Configuration)configuration, (String)stockStatsTableName);
        configuration.set("phoenix.upsert.stmt", "UPSERT into " + stockStatsTableName + " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
        job.setOutputFormatClass(PhoenixOutputFormat.class);
    }

    private void upsertData(Connection conn, String stockTableName) throws SQLException {
        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
        this.upsertData(stmt, "AAPL", 2010, new Double[]{73.48, 82.25, 75.2, 82.89});
        this.upsertData(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
        this.upsertData(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
        this.upsertData(stmt, "AAPL", 2007, new Double[]{73.88, 80.24, 78.9, 66.3});
        conn.commit();
    }

    private void upsertData(PreparedStatement stmt, String name, int year, Double[] data) throws SQLException {
        int i = 1;
        stmt.setString(i++, name);
        stmt.setInt(i++, year);
        PhoenixArray.PrimitiveDoublePhoenixArray recordings = new PhoenixArray.PrimitiveDoublePhoenixArray((PDataType)PDouble.INSTANCE, (Object[])data);
        stmt.setArray(i++, (Array)recordings);
        stmt.execute();
    }

    public static class StockReducer
    extends Reducer<Text, DoubleWritable, NullWritable, StockWritable> {
        protected void reduce(Text key, Iterable<DoubleWritable> recordings, Reducer.Context context) throws IOException, InterruptedException {
            double maxPrice = Double.MIN_VALUE;
            for (DoubleWritable recording : recordings) {
                if (!(maxPrice < recording.get())) continue;
                maxPrice = recording.get();
            }
            StockWritable stock = new StockWritable();
            stock.setStockName(key.toString());
            stock.setMaxPrice(maxPrice);
            context.write((Object)NullWritable.get(), (Object)stock);
        }
    }

    public static class StockMapper
    extends Mapper<NullWritable, StockWritable, Text, DoubleWritable> {
        private Text stock = new Text();
        private DoubleWritable price = new DoubleWritable();

        protected void map(NullWritable key, StockWritable stockWritable, Mapper.Context context) throws IOException, InterruptedException {
            double[] recordings = stockWritable.getRecordings();
            String stockName = stockWritable.getStockName();
            double maxPrice = Double.MIN_VALUE;
            for (double recording : recordings) {
                if (!(maxPrice < recording)) continue;
                maxPrice = recording;
            }
            this.stock.set(stockName);
            this.price.set(maxPrice);
            context.write((Object)this.stock, (Object)this.price);
        }
    }

    public static class StockWritable
    implements DBWritable {
        private String stockName;
        private double[] recordings;
        private double maxPrice;

        public void readFields(ResultSet rs) throws SQLException {
            this.stockName = rs.getString(MapReduceIT.STOCK_NAME);
            Array recordingsArray = rs.getArray(MapReduceIT.RECORDINGS_QUARTER);
            this.recordings = (double[])recordingsArray.getArray();
        }

        public void write(PreparedStatement pstmt) throws SQLException {
            pstmt.setString(1, this.stockName);
            pstmt.setDouble(2, this.maxPrice);
        }

        public double[] getRecordings() {
            return this.recordings;
        }

        public String getStockName() {
            return this.stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }

        public void setMaxPrice(double maxPrice) {
            this.maxPrice = maxPrice;
        }
    }
}

