/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.end2end;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.end2end.IndexToolIT;
import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.apache.phoenix.util.TestUtil;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={NeedsOwnMiniClusterTest.class})
@RunWith(value=Parameterized.class)
public class ConcurrentMutationsExtendedIT
extends ParallelStatsDisabledIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentMutationsExtendedIT.class);
    private final boolean uncovered;
    private static final Random RAND = new Random(5L);
    private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
    private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
    private static final int ROW_LOCK_WAIT_TIME = 10000;
    private static final int MAX_LOOKBACK_AGE = 1000000;
    private final Object lock = new Object();

    public ConcurrentMutationsExtendedIT(boolean uncovered) {
        this.uncovered = uncovered;
    }

    @BeforeClass
    public static synchronized void doSetup() throws Exception {
        HashMap props = Maps.newHashMapWithExpectedSize((int)4);
        props.put("phoenix.global.index.row.age.threshold.to.delete.ms", Long.toString(0L));
        props.put("phoenix.max.lookback.age.seconds", Integer.toString(1000000));
        props.put("hbase.rowlock.wait.duration", "100");
        props.put("phoenix.index.concurrent.wait.duration.ms", "10");
        ConcurrentMutationsExtendedIT.setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
    }

    @Parameterized.Parameters(name="uncovered={0}")
    public static synchronized Collection<Boolean> data() {
        return Arrays.asList(true, false);
    }

    @Test
    public void testSynchronousDeletesAndUpsertValues() throws Exception {
        final String tableName = ConcurrentMutationsExtendedIT.generateUniqueName();
        String indexName = ConcurrentMutationsExtendedIT.generateUniqueName();
        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0");
        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + indexName + " ON " + tableName + "(v1)");
        final CountDownLatch doneSignal = new CountDownLatch(2);
        Runnable r1 = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
                    for (int i = 0; i < 50; ++i) {
                        Thread.sleep(20L);
                        Object object = ConcurrentMutationsExtendedIT.this.lock;
                        synchronized (object) {
                            try (PhoenixConnection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl(), props).unwrap(PhoenixConnection.class);){
                                conn.setAutoCommit(true);
                                conn.createStatement().execute("DELETE FROM " + tableName);
                            }
                            continue;
                        }
                    }
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Runnable r2 = new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    Properties props = PropertiesUtil.deepCopy((Properties)TestUtil.TEST_PROPERTIES);
                    int nRowsToUpsert = 1000;
                    for (int i = 0; i < nRowsToUpsert; ++i) {
                        Object object = ConcurrentMutationsExtendedIT.this.lock;
                        synchronized (object) {
                            try (PhoenixConnection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl(), props).unwrap(PhoenixConnection.class);){
                                conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + i % 10 + ", 0, 1)");
                                if (i % 20 == 0 || i == nRowsToUpsert - 1) {
                                    conn.commit();
                                }
                            }
                            continue;
                        }
                    }
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread t2 = new Thread(r2);
        t2.start();
        doneSignal.await(60L, TimeUnit.SECONDS);
        IndexToolIT.verifyIndexTable(tableName, indexName, conn);
    }

    @Test
    public void testConcurrentDeletesAndUpsertValues() throws Exception {
        final String tableName = ConcurrentMutationsExtendedIT.generateUniqueName();
        String indexName = ConcurrentMutationsExtendedIT.generateUniqueName();
        String singleCellindexName = "SC_" + ConcurrentMutationsExtendedIT.generateUniqueName();
        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY (k1,k2))");
        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + indexName + " ON " + tableName + "(v1)");
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + singleCellindexName + " ON " + tableName + "(v1) IMMUTABLE_STORAGE_SCHEME=SINGLE_CELL_ARRAY_WITH_OFFSETS, COLUMN_ENCODED_BYTES=2");
        final CountDownLatch doneSignal = new CountDownLatch(2);
        Runnable r1 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    conn.setAutoCommit(true);
                    for (int i = 0; i < 50; ++i) {
                        Thread.sleep(20L);
                        conn.createStatement().execute("DELETE FROM " + tableName);
                    }
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
                catch (InterruptedException e) {
                    Thread.interrupted();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Runnable r2 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    for (int i = 0; i < 1000; ++i) {
                        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + i % 10 + ", 0, 1)");
                        if (i % 20 != 0) continue;
                        conn.commit();
                    }
                    conn.commit();
                }
                catch (SQLException e) {
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread t2 = new Thread(r2);
        t2.start();
        doneSignal.await(60L, TimeUnit.SECONDS);
        IndexToolIT.verifyIndexTable(tableName, indexName, conn);
        IndexToolIT.verifyIndexTable(tableName, singleCellindexName, conn);
    }

    @Test
    public void testConcurrentUpserts() throws Exception {
        int i;
        int nThreads = 10;
        int batchSize = 100;
        int nRows = 499;
        int nIndexValues = 23;
        final String tableName = ConcurrentMutationsExtendedIT.generateUniqueName();
        String indexName = ConcurrentMutationsExtendedIT.generateUniqueName();
        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
        conn.createStatement().execute("CREATE TABLE " + tableName + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, a.v1 INTEGER, b.v2 INTEGER, c.v3 INTEGER, d.v4 INTEGER,CONSTRAINT pk PRIMARY KEY (k1,k2))  COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + indexName + " ON " + tableName + "(v1)" + (this.uncovered ? "" : "INCLUDE(v2, v3)"));
        final CountDownLatch doneSignal = new CountDownLatch(nThreads);
        Runnable[] runnables = new Runnable[nThreads];
        long startTime = EnvironmentEdgeManager.currentTimeMillis();
        for (i = 0; i < nThreads; ++i) {
            runnables[i] = new Runnable(){

                @Override
                public void run() {
                    try {
                        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                        for (int i = 0; i < 10000; ++i) {
                            conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES (" + i % 499 + ", 0, " + (RAND.nextBoolean() ? null : Integer.valueOf(RAND.nextInt() % 23)) + ", " + (RAND.nextBoolean() ? null : Integer.valueOf(RAND.nextInt())) + ", " + (RAND.nextBoolean() ? null : Integer.valueOf(RAND.nextInt())) + ", " + (RAND.nextBoolean() ? null : Integer.valueOf(RAND.nextInt())) + ")");
                            if (i % 100 != 0) continue;
                            conn.commit();
                        }
                        conn.commit();
                    }
                    catch (SQLException e) {
                        LOGGER.warn("Exception during upsert : " + e);
                    }
                    finally {
                        doneSignal.countDown();
                    }
                }
            };
        }
        for (i = 0; i < nThreads; ++i) {
            Thread t = new Thread(runnables[i]);
            t.start();
        }
        Assert.assertTrue((String)"Ran out of time", (boolean)doneSignal.await(120L, TimeUnit.SECONDS));
        LOGGER.info("Total upsert time in ms : " + (EnvironmentEdgeManager.currentTimeMillis() - startTime));
        long actualRowCount = IndexToolIT.verifyIndexTable(tableName, indexName, conn);
        Assert.assertEquals((long)499L, (long)actualRowCount);
    }

    @Test
    public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
        final String tableName = LOCK_TEST_TABLE_PREFIX + ConcurrentMutationsExtendedIT.generateUniqueName();
        String indexName = ConcurrentMutationsExtendedIT.generateUniqueName();
        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
        conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0");
        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + indexName + " ON " + tableName + "(v)");
        final CountDownLatch doneSignal = new CountDownLatch(2);
        final String[] failedMsg = new String[1];
        Runnable r1 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
                    conn.commit();
                }
                catch (Exception e) {
                    failedMsg[0] = e.getMessage();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Runnable r2 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',3)");
                    conn.commit();
                }
                catch (Exception e) {
                    failedMsg[0] = e.getMessage();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread t2 = new Thread(r2);
        t2.start();
        doneSignal.await(15000L, TimeUnit.SECONDS);
        Assert.assertNull((String)failedMsg[0], (Object)failedMsg[0]);
        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
        Assert.assertEquals((long)1L, (long)actualRowCount);
    }

    @Test
    public void testLockUntilMVCCAdvanced() throws Exception {
        final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX + ConcurrentMutationsExtendedIT.generateUniqueName();
        String indexName = ConcurrentMutationsExtendedIT.generateUniqueName();
        Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
        conn.createStatement().execute("CREATE TABLE " + tableName + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0");
        conn.createStatement().execute("CREATE " + (this.uncovered ? "UNCOVERED " : " ") + "INDEX " + indexName + " ON " + tableName + "(v,k)");
        conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',0)");
        conn.commit();
        TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
        final CountDownLatch doneSignal = new CountDownLatch(2);
        final String[] failedMsg = new String[1];
        Runnable r1 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',1)");
                    conn.commit();
                }
                catch (Exception e) {
                    failedMsg[0] = e.getMessage();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Runnable r2 = new Runnable(){

            @Override
            public void run() {
                try {
                    Connection conn = DriverManager.getConnection(ConcurrentMutationsExtendedIT.getUrl());
                    conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES ('foo',2)");
                    conn.commit();
                }
                catch (Exception e) {
                    failedMsg[0] = e.getMessage();
                    throw new RuntimeException(e);
                }
                finally {
                    doneSignal.countDown();
                }
            }
        };
        Thread t1 = new Thread(r1);
        t1.start();
        Thread t2 = new Thread(r2);
        t2.start();
        doneSignal.await(15000L, TimeUnit.SECONDS);
        long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
        Assert.assertEquals((long)1L, (long)actualRowCount);
    }

    public static class DelayingRegionObserver
    extends SimpleRegionObserver {
        private volatile boolean lockedTableRow;

        public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
            try {
                String tableName = ((RegionCoprocessorEnvironment)c.getEnvironment()).getRegionInfo().getTable().getNameAsString();
                if (tableName.startsWith(ConcurrentMutationsExtendedIT.MVCC_LOCK_TEST_TABLE_PREFIX)) {
                    Thread.sleep(5000L);
                }
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c, MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
            try {
                String tableName = ((RegionCoprocessorEnvironment)c.getEnvironment()).getRegionInfo().getTable().getNameAsString();
                if (tableName.startsWith(ConcurrentMutationsExtendedIT.LOCK_TEST_TABLE_PREFIX)) {
                    if (this.lockedTableRow) {
                        throw new DoNotRetryIOException("Expected lock in preBatchMutate to be exclusive, but it wasn't for row " + Bytes.toStringBinary((byte[])((Mutation)miniBatchOp.getOperation(0)).getRow()));
                    }
                    this.lockedTableRow = true;
                    Thread.sleep(12000L);
                }
                Thread.sleep(Math.abs(RAND.nextInt()) % 10);
            }
            catch (InterruptedException interruptedException) {
            }
            finally {
                this.lockedTableRow = false;
            }
        }
    }
}

