/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.SynchronousHMSEventProcessorForTests;
import org.apache.impala.common.Pair;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.util.EventSequence;
import org.apache.impala.util.NoOpEventSequence;
import org.apache.impala.util.PatternMatcher;
import org.apache.impala.util.RandomHiveQueryRunner;
import org.apache.thrift.TException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventsProcessorStressTest {
    private static final Random random = new Random(117L);
    private static CatalogServiceTestCatalog catalog_;
    private static MetastoreEventsProcessor eventsProcessor_;
    private static final String testDbPrefix_ = "events_stress_db_";
    private static final String testTblPrefix_ = "stress_test_tbl_";
    private static final int numClients_;
    private static final int numQueriesPerClient_;
    private ExecutorService impalaRefreshExecutorService_;
    private static final Logger LOG;

    private static Pair<Integer, Integer> getConcurrencyConfigs() {
        int numClients = 4;
        if (System.getProperty("numClients") != null) {
            numClients = Integer.parseInt(System.getProperty("numClients"));
        } else if (MetastoreShim.getMajorVersion() >= 3L) {
            numClients = 1;
        }
        int numQueriesPerClient = 50;
        if (System.getProperty("numQueriesPerClient") != null) {
            numQueriesPerClient = Integer.parseInt(System.getProperty("numQueriesPerClient"));
        } else if (MetastoreShim.getMajorVersion() >= 3L) {
            numQueriesPerClient = 200;
        }
        return new Pair((Object)numClients, (Object)numQueriesPerClient);
    }

    @BeforeClass
    public static void setupTestEnv() throws Exception {
        catalog_ = CatalogServiceTestCatalog.create();
        CatalogOpExecutor catalogOpExecutor = catalog_.getCatalogOpExecutor();
        try (MetaStoreClientPool.MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();){
            CurrentNotificationEventId currentNotificationId = metaStoreClient.getHiveClient().getCurrentNotificationEventId();
            eventsProcessor_ = new SynchronousHMSEventProcessorForTests(catalogOpExecutor, currentNotificationId.getEventId(), 10L);
            eventsProcessor_.start();
        }
        catalog_.setMetastoreEventProcessor((ExternalEventsProcessor)eventsProcessor_);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @AfterClass
    public static void destroyTestEnv() {
        try {
            for (int i = 0; i < numClients_; ++i) {
                try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
                    msClient.getHiveClient().dropDatabase(testDbPrefix_ + i, true, true, true);
                }
                catalog_.removeDb(testDbPrefix_ + i);
            }
        }
        catch (Exception exception) {
        }
        finally {
            if (eventsProcessor_ != null) {
                eventsProcessor_.shutdown();
            }
        }
    }

    private void startImpalaRefreshClients() {
        this.impalaRefreshExecutorService_ = Executors.newFixedThreadPool(numClients_, new ThreadFactoryBuilder().setNameFormat("impala-refresh-client-%d").setDaemon(true).build());
        for (int i = 0; i < numClients_; ++i) {
            this.impalaRefreshExecutorService_.submit(() -> {
                int clientId = Integer.parseInt(Thread.currentThread().getName().substring("impala-refresh-client-".length()));
                String dbName = testDbPrefix_ + clientId;
                while (true) {
                    try {
                        while (true) {
                            List tablenames = catalog_.getTableNames(dbName, PatternMatcher.MATCHER_MATCH_ALL);
                            for (String tbl : tablenames) {
                                catalog_.reloadTable(catalog_.getTable(dbName, tbl), "test refresh operation for events stress test", (EventSequence)NoOpEventSequence.INSTANCE);
                            }
                            Thread.sleep(random.nextInt(3000));
                        }
                    }
                    catch (InterruptedException | CatalogException throwable) {
                        continue;
                    }
                    break;
                }
            });
        }
    }

    private void stopImpalaRefreshClients() {
        this.impalaRefreshExecutorService_.shutdownNow();
    }

    private long getCurrentNotificationId() throws TException {
        try (MetaStoreClientPool.MetaStoreClient metaStoreClient = catalog_.getMetaStoreClient();){
            long l = metaStoreClient.getHiveClient().getCurrentNotificationEventId().getEventId();
            return l;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUsingRandomHiveQueries() throws Exception {
        LOG.info("Using number of clients: {} number of queries per client: {}", (Object)numClients_, (Object)numQueriesPerClient_);
        RandomHiveQueryRunner queryRunner = new RandomHiveQueryRunner(random, testDbPrefix_, testTblPrefix_, numClients_, numQueriesPerClient_, null);
        long eventIdBefore = this.getCurrentNotificationId();
        queryRunner.start();
        this.startImpalaRefreshClients();
        try {
            while (!queryRunner.isTerminated()) {
                Thread.sleep(random.nextInt(10000));
                eventsProcessor_.processEvents();
                Assert.assertEquals((Object)MetastoreEventsProcessor.EventProcessorStatus.ACTIVE, (Object)eventsProcessor_.getStatus());
            }
            queryRunner.checkForErrors();
        }
        catch (Exception ex) {
            LOG.error(ex.getMessage(), (Throwable)ex);
            Assert.fail((String)this.unwrapCause(ex));
        }
        finally {
            this.stopImpalaRefreshClients();
            queryRunner.shutdownNow();
            LOG.info("Total number of events generated {}", (Object)(this.getCurrentNotificationId() - eventIdBefore));
        }
    }

    private String unwrapCause(Throwable ex) {
        String cause = ex.getMessage();
        while (ex.getCause() != null) {
            cause = ex.getCause().getMessage();
            ex = ex.getCause();
        }
        return cause;
    }

    static {
        LOG = LoggerFactory.getLogger(EventsProcessorStressTest.class);
        Pair<Integer, Integer> configs = EventsProcessorStressTest.getConcurrencyConfigs();
        numClients_ = (Integer)configs.first;
        numQueriesPerClient_ = (Integer)configs.second;
    }
}

