/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import java.io.IOException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.phoenix.end2end.MapReduceIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.iterate.TestingMapReduceParallelScanGrouper;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.PhoenixOutputFormat;
import org.apache.phoenix.mapreduce.PhoenixTestingInputFormat;
import org.apache.phoenix.mapreduce.index.PhoenixIndexDBWritable;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDouble;
import org.apache.phoenix.schema.types.PhoenixArray;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={NeedsOwnMiniClusterTest.class})
@RunWith(value=Parameterized.class)
public class TableSnapshotReadsMapReduceIT
extends BaseTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(TableSnapshotReadsMapReduceIT.class);
    private static final String STOCK_NAME = "STOCK_NAME";
    private static final String RECORDING_YEAR = "RECORDING_YEAR";
    private static final String RECORDINGS_QUARTER = "RECORDINGS_QUARTER";
    private static final String MAX_RECORDING = "MAX_RECORDING";
    private static final String SNAPSHOT_NAME = "FOO";
    private static final String FIELD1 = "FIELD1";
    private static final String FIELD2 = "FIELD2";
    private static final String FIELD3 = "FIELD3";
    private static final String CREATE_TABLE = "CREATE TABLE IF NOT EXISTS %s (  FIELD1 VARCHAR NOT NULL , FIELD2 VARCHAR , FIELD3 INTEGER CONSTRAINT pk PRIMARY KEY (FIELD1 ))";
    private static final String UPSERT = "UPSERT into %s values (?, ?, ?)";
    private static final String CREATE_STOCK_TABLE = "CREATE TABLE IF NOT EXISTS %s ( STOCK_NAME VARCHAR NOT NULL , RECORDING_YEAR  INTEGER NOT  NULL,  RECORDINGS_QUARTER  DOUBLE array[] CONSTRAINT pk PRIMARY KEY ( STOCK_NAME, RECORDING_YEAR )) SPLIT ON ('AA')";
    private static final String CREATE_STOCK_STATS_TABLE = "CREATE TABLE IF NOT EXISTS %s(STOCK_NAME VARCHAR NOT NULL , MAX_RECORDING DOUBLE CONSTRAINT pk PRIMARY KEY (STOCK_NAME ))";
    private static List<List<Object>> result;
    private long timestamp;
    private String tableName;
    private Job job;
    private Path tmpDir;
    private Configuration conf;
    private static final Random RANDOM;
    private Boolean isSnapshotRestoreDoneExternally;

    public TableSnapshotReadsMapReduceIT(Boolean isSnapshotRestoreDoneExternally) {
        this.isSnapshotRestoreDoneExternally = isSnapshotRestoreDoneExternally;
    }

    @Parameterized.Parameters
    public static synchronized Collection<Boolean> snapshotRestoreDoneExternallyParams() {
        return Arrays.asList(true, false);
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap props = Maps.newHashMapWithExpectedSize((int)1);
        TableSnapshotReadsMapReduceIT.setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
        TableSnapshotReadsMapReduceIT.getUtility().getAdmin().balancerSwitch(false, true);
    }

    @Before
    public void before() throws SQLException, IOException {
        try (Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());){
            this.tableName = TableSnapshotReadsMapReduceIT.generateUniqueName();
            conn.createStatement().execute(String.format(CREATE_TABLE, this.tableName));
            conn.commit();
        }
        this.conf = TableSnapshotReadsMapReduceIT.getUtility().getConfiguration();
        this.job = Job.getInstance((Configuration)this.conf);
        this.tmpDir = TableSnapshotReadsMapReduceIT.getUtility().getRandomDir();
    }

    @Test
    public void testMapReduceSnapshots() throws Exception {
        PhoenixMapReduceUtil.setInput((Job)this.job, PhoenixIndexDBWritable.class, (String)SNAPSHOT_NAME, (String)this.tableName, (Path)this.tmpDir, null, (String[])new String[]{FIELD1, FIELD2, FIELD3});
        this.configureJob(this.job, this.tableName, null, null, false);
    }

    @Test
    public void testMapReduceSnapshotsMultiRegion() throws Exception {
        String inputQuery = "SELECT * FROM " + this.tableName + " ORDER BY FIELD1 asc";
        PhoenixMapReduceUtil.setInput((Job)this.job, PhoenixIndexDBWritable.class, (String)SNAPSHOT_NAME, (String)this.tableName, (Path)this.tmpDir, (String)inputQuery);
        this.configureJob(this.job, this.tableName, null, null, true);
    }

    @Test
    public void testMapReduceSnapshotsWithCondition() throws Exception {
        PhoenixMapReduceUtil.setInput((Job)this.job, PhoenixIndexDBWritable.class, (String)SNAPSHOT_NAME, (String)this.tableName, (Path)this.tmpDir, (String)"FIELD3 > 0001", (String[])new String[]{FIELD1, FIELD2, FIELD3});
        this.configureJob(this.job, this.tableName, null, "FIELD3 > 0001", false);
    }

    @Test
    public void testMapReduceSnapshotWithLimit() throws Exception {
        String inputQuery = "SELECT * FROM " + this.tableName + " ORDER BY FIELD2 LIMIT 1";
        PhoenixMapReduceUtil.setInput((Job)this.job, PhoenixIndexDBWritable.class, (String)SNAPSHOT_NAME, (String)this.tableName, (Path)this.tmpDir, (String)inputQuery);
        this.configureJob(this.job, this.tableName, inputQuery, null, false);
    }

    @Test
    public void testSnapshotMapReduceJobNotImpactingTableMapReduceJob() throws Exception {
        PhoenixMapReduceUtil.setInput((Job)this.job, PhoenixIndexDBWritable.class, (String)SNAPSHOT_NAME, (String)this.tableName, (Path)this.tmpDir, null, (String[])new String[]{FIELD1, FIELD2, FIELD3});
        this.configureJob(this.job, this.tableName, null, null, false);
        Configuration config = this.job.getConfiguration();
        Assert.assertEquals((String)"Correct snapshot name not found in configuration", (Object)SNAPSHOT_NAME, (Object)config.get("phoenix.mapreduce.snapshot.name"));
        TestingMapReduceParallelScanGrouper.clearNumCallsToGetRegionBoundaries();
        try (Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());){
            this.tableName = TableSnapshotReadsMapReduceIT.generateUniqueName();
            conn.createStatement().execute(String.format(CREATE_TABLE, this.tableName));
            conn.commit();
            this.job = this.createAndTestJob(conn);
        }
        config = this.job.getConfiguration();
        Assert.assertNull((String)"Snapshot name is not null in Configuration", (Object)config.get("phoenix.mapreduce.snapshot.name"));
    }

    private Job createAndTestJob(Connection conn) throws SQLException, IOException, InterruptedException, ClassNotFoundException {
        String stockTableName = TableSnapshotReadsMapReduceIT.generateUniqueName();
        String stockStatsTableName = TableSnapshotReadsMapReduceIT.generateUniqueName();
        conn.createStatement().execute(String.format(CREATE_STOCK_TABLE, stockTableName));
        conn.createStatement().execute(String.format(CREATE_STOCK_STATS_TABLE, stockStatsTableName));
        conn.commit();
        Configuration conf = ((PhoenixConnection)conn).getQueryServices().getConfiguration();
        Job job = Job.getInstance((Configuration)conf);
        PhoenixMapReduceUtil.setInput((Job)job, MapReduceIT.StockWritable.class, PhoenixTestingInputFormat.class, (String)stockTableName, null, (String[])new String[]{STOCK_NAME, RECORDING_YEAR, "0.RECORDINGS_QUARTER"});
        this.testJob(conn, job, stockTableName, stockStatsTableName);
        return job;
    }

    private void testJob(Connection conn, Job job, String stockTableName, String stockStatsTableName) throws SQLException, InterruptedException, IOException, ClassNotFoundException {
        Assert.assertEquals((String)"Failed to reset getRegionBoundaries counter for scanGrouper", (long)0L, (long)TestingMapReduceParallelScanGrouper.getNumCallsToGetRegionBoundaries());
        this.upsertData(conn, stockTableName);
        job.getConfiguration().set("mapreduce.framework.name", "local");
        this.setOutput(job, stockStatsTableName);
        job.setMapperClass(MapReduceIT.StockMapper.class);
        job.setReducerClass(MapReduceIT.StockReducer.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(MapReduceIT.StockWritable.class);
        Assert.assertTrue((String)"Job didn't complete successfully! Check logs for reason.", (boolean)job.waitForCompletion(true));
    }

    private void setOutput(Job job, String stockStatsTableName) {
        Configuration configuration = job.getConfiguration();
        PhoenixConfigurationUtil.setOutputTableName((Configuration)configuration, (String)stockStatsTableName);
        configuration.set("phoenix.upsert.stmt", "UPSERT into " + stockStatsTableName + " (" + STOCK_NAME + ", " + MAX_RECORDING + ") values (?,?)");
        job.setOutputFormatClass(PhoenixOutputFormat.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void configureJob(Job job, String tableName, String inputQuery, String condition, boolean shouldSplit) throws Exception {
        try {
            this.upsertAndSnapshot(tableName, shouldSplit, job.getConfiguration());
            result = new ArrayList<List<Object>>();
            job.setMapperClass(TableSnapshotMapper.class);
            job.setMapOutputKeyClass(ImmutableBytesWritable.class);
            job.setMapOutputValueClass(NullWritable.class);
            job.setOutputFormatClass(NullOutputFormat.class);
            Assert.assertTrue((boolean)job.waitForCompletion(true));
            Properties props = new Properties();
            props.setProperty("CurrentSCN", Long.toString(this.timestamp));
            StringBuilder selectQuery = new StringBuilder("SELECT * FROM " + tableName);
            if (condition != null) {
                selectQuery.append(" WHERE " + condition);
            }
            if (inputQuery == null) {
                inputQuery = selectQuery.toString();
            }
            ResultSet rs = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl(), props).createStatement().executeQuery(inputQuery);
            if (shouldSplit) {
                Collections.sort(result, new Comparator<List<Object>>(){

                    @Override
                    public int compare(List<Object> o1, List<Object> o2) {
                        return ((String)o1.get(0)).compareTo((String)o2.get(0));
                    }
                });
            }
            for (List<Object> r : result) {
                Assert.assertTrue((String)"No data stored in the table!", (boolean)rs.next());
                int i = 0;
                String field1 = rs.getString(i + 1);
                Assert.assertEquals((String)"Got the incorrect value for field1", (Object)r.get(i++), (Object)field1);
                String field2 = rs.getString(i + 1);
                Assert.assertEquals((String)"Got the incorrect value for field2", (Object)r.get(i++), (Object)field2);
                int field3 = rs.getInt(i + 1);
                Assert.assertEquals((String)"Got the incorrect value for field3", (Object)r.get(i++), (Object)field3);
            }
            Assert.assertFalse((String)("Should only have stored" + result.size() + "rows in the table for the timestamp!"), (boolean)rs.next());
            this.assertRestoreDirCount(this.conf, this.tmpDir.toString(), 1);
        }
        finally {
            this.deleteSnapshotIfExists(SNAPSHOT_NAME);
        }
    }

    private void upsertData(Connection conn, String stockTableName) throws SQLException {
        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, stockTableName));
        this.upsertRecord(stmt, "AAPL", 2009, new Double[]{85.88, 91.04, 88.5, 90.3});
        this.upsertRecord(stmt, "AAPL", 2008, new Double[]{75.88, 81.04, 78.5, 80.3});
        conn.commit();
    }

    private void upsertRecord(PreparedStatement stmt, String name, int year, Double[] data) throws SQLException {
        int i = 1;
        stmt.setString(i++, name);
        stmt.setInt(i++, year);
        PhoenixArray.PrimitiveDoublePhoenixArray recordings = new PhoenixArray.PrimitiveDoublePhoenixArray((PDataType)PDouble.INSTANCE, (Object[])data);
        stmt.setArray(i++, (Array)recordings);
        stmt.execute();
    }

    private void upsertData(String tableName) throws SQLException {
        Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());
        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
        this.upsertRecord(stmt, "AAAA", "JHHD", 37);
        this.upsertRecord(stmt, "BBBB", "JSHJ", 224);
        this.upsertRecord(stmt, "CCCC", "SSDD", 15);
        this.upsertRecord(stmt, "PPPP", "AJDG", 53);
        this.upsertRecord(stmt, "SSSS", "HSDG", 59);
        this.upsertRecord(stmt, "XXXX", "HDPP", 22);
        conn.commit();
    }

    private void upsertDataBeforeSplit(String tableName) throws SQLException {
        Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());
        PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
        this.upsertRecord(stmt, "CCCC", "SSDD", RANDOM.nextInt());
        for (int i = 0; i < 100; ++i) {
            this.upsertRecord(stmt, "AAAA" + i, "JHHA" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "0000" + i, "JHHB" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "9999" + i, "JHHC" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "BBBB" + i, "JSHJ" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "BBBB1" + i, "JSHK" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "BBBB2" + i, "JSHL" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "CCCC1" + i, "SSDE" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "CCCC2" + i, "SSDF" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "PPPP" + i, "AJDH" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "SSSS" + i, "HSDG" + i, RANDOM.nextInt());
            this.upsertRecord(stmt, "XXXX" + i, "HDPP" + i, RANDOM.nextInt());
        }
        conn.commit();
    }

    private void upsertRecord(PreparedStatement stmt, String field1, String field2, int field3) throws SQLException {
        stmt.setString(1, field1);
        stmt.setString(2, field2);
        stmt.setInt(3, field3);
        stmt.execute();
    }

    private void upsertAndSnapshot(String tableName, boolean shouldSplit, Configuration configuration) throws Exception {
        if (shouldSplit) {
            this.upsertDataBeforeSplit(tableName);
        } else {
            this.upsertData(tableName);
        }
        TableName hbaseTableName = TableName.valueOf((String)tableName);
        try (Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());
             Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
            if (shouldSplit) {
                TableSnapshotReadsMapReduceIT.splitTableSync(admin, hbaseTableName, Bytes.toBytes((String)"CCCC"), 2);
            }
            this.snapshotCreateSync(hbaseTableName, admin, SNAPSHOT_NAME);
            List snapshots = admin.listSnapshots();
            Assert.assertEquals((Object)tableName, (Object)((SnapshotDescription)snapshots.get(0)).getTableNameAsString());
            this.timestamp = System.currentTimeMillis();
            PreparedStatement stmt = conn.prepareStatement(String.format(UPSERT, tableName));
            this.upsertRecord(stmt, "DDDD", "SNFB", 45);
            conn.commit();
            if (this.isSnapshotRestoreDoneExternally.booleanValue()) {
                Path rootDir = new Path(configuration.get("hbase.rootdir"));
                FileSystem fs = rootDir.getFileSystem(configuration);
                Path restoreDir = new Path(configuration.get("phoenix.tableSnapshot.restore.dir"));
                RestoreSnapshotHelper.copySnapshotForScanner((Configuration)configuration, (FileSystem)fs, (Path)rootDir, (Path)restoreDir, (String)SNAPSHOT_NAME);
                PhoenixConfigurationUtil.setMRSnapshotManagedExternally((Configuration)configuration, (Boolean)true);
            }
        }
    }

    private void snapshotCreateSync(TableName hbaseTableName, Admin admin, String snapshotName) throws IOException, InterruptedException {
        boolean isSnapshotCreated = false;
        SnapshotDescription snapshotDescription = new SnapshotDescription(snapshotName);
        block4: for (int i = 0; i < 3 && !isSnapshotCreated; ++i) {
            if (i > 0) {
                LOGGER.info("Retry count {} for snapshot creation", (Object)i);
            }
            try {
                admin.snapshot(snapshotName, hbaseTableName);
            }
            catch (Exception e) {
                LOGGER.info("Snapshot creation failure for {}", (Object)snapshotName, (Object)e);
                continue;
            }
            for (int j = 0; j < 10; ++j) {
                Thread.sleep(1000L);
                try {
                    if (!admin.isSnapshotFinished(snapshotDescription)) continue;
                    isSnapshotCreated = true;
                }
                catch (Exception e) {
                    LOGGER.error("Snapshot creation failed.", (Throwable)e);
                }
                continue block4;
            }
        }
        if (!isSnapshotCreated) {
            throw new IOException("Snapshot creation failed for " + snapshotName);
        }
    }

    private void deleteSnapshotIfExists(String snapshotName) throws Exception {
        try (Connection conn = DriverManager.getConnection(TableSnapshotReadsMapReduceIT.getUrl());
             Admin admin = conn.unwrap(PhoenixConnection.class).getQueryServices().getAdmin();){
            List snapshotDescriptions = admin.listSnapshots();
            boolean isSnapshotPresent = false;
            if (snapshotDescriptions != null && !snapshotDescriptions.isEmpty()) {
                for (SnapshotDescription snapshotDescription : snapshotDescriptions) {
                    if (!snapshotName.equals(snapshotDescription.getName())) continue;
                    isSnapshotPresent = true;
                    break;
                }
            }
            if (isSnapshotPresent) {
                admin.deleteSnapshot(snapshotName);
            } else {
                LOGGER.info("Snapshot {} does not exist. Possibly corrupted due to region movements.", (Object)snapshotName);
            }
        }
    }

    private void assertRestoreDirCount(Configuration conf, String restoreDir, int expectedCount) throws IOException {
        FileSystem fs = FileSystem.get((Configuration)conf);
        FileStatus[] subDirectories = fs.listStatus(new Path(restoreDir));
        Assert.assertNotNull((Object)subDirectories);
        if (this.isSnapshotRestoreDoneExternally.booleanValue()) {
            Assert.assertEquals((long)expectedCount, (long)subDirectories.length);
        } else {
            Assert.assertEquals((long)0L, (long)subDirectories.length);
        }
    }

    static {
        RANDOM = new Random();
    }

    public static class TableSnapshotMapper
    extends Mapper<NullWritable, PhoenixIndexDBWritable, ImmutableBytesWritable, NullWritable> {
        protected void map(NullWritable key, PhoenixIndexDBWritable record, Mapper.Context context) throws IOException, InterruptedException {
            List values = record.getValues();
            result.add(values);
            context.write((Object)new ImmutableBytesWritable(UUID.randomUUID().toString().getBytes()), (Object)NullWritable.get());
        }
    }
}

