/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog.events;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.impala.catalog.HdfsTable;
import org.apache.impala.catalog.IncompleteTable;
import org.apache.impala.catalog.MetaStoreClientPool;
import org.apache.impala.catalog.MetastoreApiTestUtils;
import org.apache.impala.catalog.Table;
import org.apache.impala.catalog.events.DbEventExecutor;
import org.apache.impala.catalog.events.EventExecutorService;
import org.apache.impala.catalog.events.ExternalEventsProcessor;
import org.apache.impala.catalog.events.MetastoreEvents;
import org.apache.impala.catalog.events.MetastoreEventsProcessor;
import org.apache.impala.catalog.events.RenameTableBarrierEvent;
import org.apache.impala.catalog.events.SynchronousHMSEventProcessorForTests;
import org.apache.impala.catalog.events.TableEventExecutor;
import org.apache.impala.common.Metrics;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.service.CatalogOpExecutor;
import org.apache.impala.testutil.CatalogServiceTestCatalog;
import org.apache.impala.testutil.TestUtils;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class EventExecutorServiceTest {
    private static final String DB_NAME1 = "event_executor_service_test_db1";
    private static final String DB_NAME2 = "event_executor_service_test_db2";
    private static final int TEST_EVENT_PROCESSOR_IDLE_TIME_MS = 200;
    private static CatalogServiceTestCatalog catalog_;
    private static CatalogOpExecutor catalogOpExecutor_;
    private static MetastoreEventsProcessor eventsProcessor_;
    private static EventExecutorService eventExecutorService_;
    private static boolean prev_hierarchical_event_processing_;
    private static int prev_min_event_processor_idle_ms;

    @BeforeClass
    public static void setUpClass() throws Exception {
        catalog_ = CatalogServiceTestCatalog.create();
        catalogOpExecutor_ = catalog_.getCatalogOpExecutor();
        prev_hierarchical_event_processing_ = BackendConfig.INSTANCE.getBackendCfg().enable_hierarchical_event_processing;
        prev_min_event_processor_idle_ms = BackendConfig.INSTANCE.getBackendCfg().min_event_processor_idle_ms;
        BackendConfig.INSTANCE.getBackendCfg().enable_hierarchical_event_processing = true;
        BackendConfig.INSTANCE.getBackendCfg().min_event_processor_idle_ms = 200;
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            CurrentNotificationEventId currentNotificationId = msClient.getHiveClient().getCurrentNotificationEventId();
            eventsProcessor_ = new SynchronousHMSEventProcessorForTests(catalogOpExecutor_, currentNotificationId.getEventId(), 1000L);
            eventsProcessor_.start();
        }
        eventExecutorService_ = eventsProcessor_.getEventExecutorService();
        Assert.assertNotNull((Object)eventExecutorService_);
        catalog_.setMetastoreEventProcessor((ExternalEventsProcessor)eventsProcessor_);
    }

    @AfterClass
    public static void cleanUpClass() throws Exception {
        eventsProcessor_.setEventExecutorService(eventExecutorService_);
        EventExecutorServiceTest.dropDatabase(DB_NAME1);
        EventExecutorServiceTest.dropDatabase(DB_NAME2);
        eventsProcessor_.processEvents();
        eventsProcessor_.shutdown();
        BackendConfig.INSTANCE.getBackendCfg().enable_hierarchical_event_processing = prev_hierarchical_event_processing_;
        BackendConfig.INSTANCE.getBackendCfg().min_event_processor_idle_ms = prev_min_event_processor_idle_ms;
    }

    @Before
    public void setUp() throws Exception {
        eventsProcessor_.setEventExecutorService(eventExecutorService_);
        EventExecutorServiceTest.dropDatabase(DB_NAME1);
        EventExecutorServiceTest.dropDatabase(DB_NAME2);
        eventsProcessor_.processEvents();
        Assert.assertEquals((Object)MetastoreEventsProcessor.EventProcessorStatus.ACTIVE, (Object)eventsProcessor_.getStatus());
    }

    @After
    public void cleanUp() {
        Assert.assertEquals((Object)MetastoreEventsProcessor.EventProcessorStatus.ACTIVE, (Object)eventsProcessor_.getStatus());
    }

    private void createDatabase(String dbName) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            MetastoreApiTestUtils.createDatabase(msClient, null, dbName, null);
        }
    }

    private void createTable(String dbName, String tblName) throws TException {
        this.createTable(dbName, tblName, null, false, null);
    }

    private void createTransactionalTable(String dbName, String tblName, boolean isPartitioned) throws TException {
        HashMap<String, String> params = new HashMap<String, String>();
        params.put("transactional", "true");
        params.put("transactional_properties", "insert_only");
        this.createTable(dbName, tblName, params, isPartitioned, "MANAGED_TABLE");
    }

    private void createTable(String dbName, String tblName, Map<String, String> params, boolean isPartitioned, String tableType) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            MetastoreApiTestUtils.createTable(msClient, null, dbName, tblName, params, isPartitioned, tableType);
        }
    }

    private void addPartitions(String dbName, String tblName, List<List<String>> partitionValues) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            MetastoreApiTestUtils.addPartitions(msClient, dbName, tblName, partitionValues);
        }
    }

    private void insertIntoTransactionalTable(MetaStoreClientPool.MetaStoreClient msClient, String dbName, String tableName, List<String> partVal, long txnId, long writeId) throws TException, IOException {
        org.apache.hadoop.hive.metastore.api.Table table = msClient.getHiveClient().getTable(dbName, tableName);
        Partition partition = null;
        if (!CollectionUtils.isEmpty(partVal)) {
            partition = msClient.getHiveClient().getPartition(dbName, tableName, partVal);
        }
        MetastoreApiTestUtils.simulateInsertIntoTransactionalTableFromFS(catalog_.getMetaStoreClient(), table, partition, 1, txnId, writeId);
    }

    private void alterTableRename(String dbName, String tblName, String newDbName, String newTblName) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            org.apache.hadoop.hive.metastore.api.Table newTable = msClient.getHiveClient().getTable(dbName, tblName);
            newTable.setTableName(newTblName);
            newTable.setDbName(newDbName);
            msClient.getHiveClient().alter_table_with_environmentContext(dbName, tblName, newTable, null);
        }
    }

    private static void dropDatabase(String dbName) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            msClient.getHiveClient().dropDatabase(dbName, true, true, true);
        }
    }

    private void processEventsSynchronously(EventExecutorService eventExecutorService) {
        boolean isNeedProcess;
        int maxLoopTimes = 10;
        do {
            isNeedProcess = false;
            for (DbEventExecutor dbExecutor : eventExecutorService.getDbEventExecutors()) {
                if (dbExecutor.getOutstandingEventCount() != 0L) {
                    isNeedProcess = true;
                    dbExecutor.process();
                }
                for (TableEventExecutor tableExecutor : dbExecutor.getTableEventExecutors()) {
                    if (tableExecutor.getOutstandingEventCount() == 0L) continue;
                    isNeedProcess = true;
                    tableExecutor.process();
                }
            }
        } while (--maxLoopTimes > 0 && isNeedProcess);
        Assert.assertTrue((String)"Events should be processed within the limit", (maxLoopTimes > 0 ? 1 : 0) != 0);
    }

    private EventExecutorService createEventExecutorService(int numDbEventExecutor, int numTableEventExecutor) {
        EventExecutorService eventExecutorService = new EventExecutorService(eventsProcessor_, numDbEventExecutor, numTableEventExecutor);
        eventExecutorService.start();
        eventsProcessor_.setEventExecutorService(eventExecutorService);
        return eventExecutorService;
    }

    private void shutDownEventExecutorService(EventExecutorService eventExecutorService) throws Exception {
        Assert.assertTrue((eventsProcessor_.getEventExecutorService() == eventExecutorService ? 1 : 0) != 0);
        List metastoreEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics());
        Assert.assertEquals((String)("Remaining events: " + metastoreEvents), (long)0L, (long)metastoreEvents.size());
        Assert.assertEquals((long)0L, (long)eventsProcessor_.getOutstandingEventCount());
        eventsProcessor_.setEventExecutorService(eventExecutorService_);
        eventExecutorService.shutdown(true);
    }

    @Test
    public void testAssignAndUnAssignExecutors() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 3);
        List dbEventExecutorList = eventExecutorService.getDbEventExecutors();
        Assert.assertEquals((long)2L, (long)dbEventExecutorList.size());
        DbEventExecutor dbExecutor1 = (DbEventExecutor)dbEventExecutorList.get(0);
        DbEventExecutor dbExecutor2 = (DbEventExecutor)dbEventExecutorList.get(1);
        Assert.assertEquals((long)3L, (long)dbExecutor1.getTableEventExecutors().size());
        Assert.assertEquals((long)3L, (long)dbExecutor2.getTableEventExecutors().size());
        TableEventExecutor tableExecutor1 = (TableEventExecutor)dbExecutor1.getTableEventExecutors().get(0);
        TableEventExecutor tableExecutor2 = (TableEventExecutor)dbExecutor1.getTableEventExecutors().get(1);
        TableEventExecutor tableExecutor3 = (TableEventExecutor)dbExecutor2.getTableEventExecutors().get(0);
        Assert.assertEquals((long)0L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)0L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)0L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor3.getTableCount());
        String t1 = "t1";
        String t2 = "t2";
        String t3 = "t3";
        this.createDatabase(DB_NAME1);
        this.createTable(DB_NAME1, t1);
        this.createTable(DB_NAME1, t2);
        this.createDatabase(DB_NAME2);
        this.createTable(DB_NAME2, t3);
        eventsProcessor_.processEvents();
        Assert.assertNotNull((Object)catalog_.getDb(DB_NAME1));
        Assert.assertNotNull((Object)catalog_.getDb(DB_NAME2));
        Table table = catalog_.getTable(DB_NAME1, t1);
        Assert.assertTrue((boolean)(table instanceof IncompleteTable));
        table = catalog_.getTable(DB_NAME1, t2);
        Assert.assertTrue((boolean)(table instanceof IncompleteTable));
        table = catalog_.getTable(DB_NAME2, t3);
        Assert.assertTrue((boolean)(table instanceof IncompleteTable));
        Assert.assertEquals((Object)dbExecutor1, (Object)eventExecutorService.getDbEventExecutor(DB_NAME1));
        Assert.assertEquals((Object)tableExecutor1, (Object)dbExecutor1.getTableEventExecutor("event_executor_service_test_db1." + t1));
        Assert.assertEquals((Object)tableExecutor2, (Object)dbExecutor1.getTableEventExecutor("event_executor_service_test_db1." + t2));
        Assert.assertEquals((Object)dbExecutor2, (Object)eventExecutorService.getDbEventExecutor(DB_NAME2));
        Assert.assertEquals((Object)tableExecutor3, (Object)dbExecutor2.getTableEventExecutor("event_executor_service_test_db2." + t3));
        Assert.assertEquals((long)1L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)1L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)1L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)1L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)1L, (long)tableExecutor3.getTableCount());
        eventExecutorService.cleanup();
        Assert.assertEquals((long)1L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)1L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)1L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)1L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)1L, (long)tableExecutor3.getTableCount());
        Thread.sleep(1000L);
        eventExecutorService.cleanup();
        Assert.assertEquals((long)0L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)0L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)0L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor3.getTableCount());
        Assert.assertNull((Object)eventExecutorService.getDbEventExecutor(DB_NAME1));
        Assert.assertNull((Object)dbExecutor1.getTableEventExecutor("event_executor_service_test_db1." + t1));
        Assert.assertNull((Object)dbExecutor1.getTableEventExecutor("event_executor_service_test_db1." + t2));
        Assert.assertNull((Object)eventExecutorService.getDbEventExecutor(DB_NAME2));
        Assert.assertNull((Object)dbExecutor2.getTableEventExecutor("event_executor_service_test_db2." + t3));
        this.shutDownEventExecutorService(eventExecutorService);
    }

    private void clearOrShutEventExecutorService(boolean isShutDown) throws Exception {
        EventExecutorService eventExecutorService = new EventExecutorService(eventsProcessor_, 2, 2);
        List dbEventExecutorList = eventExecutorService.getDbEventExecutors();
        Assert.assertEquals((long)2L, (long)dbEventExecutorList.size());
        DbEventExecutor dbExecutor1 = (DbEventExecutor)dbEventExecutorList.get(0);
        DbEventExecutor dbExecutor2 = (DbEventExecutor)dbEventExecutorList.get(1);
        Assert.assertEquals((long)2L, (long)dbExecutor1.getTableEventExecutors().size());
        Assert.assertEquals((long)2L, (long)dbExecutor2.getTableEventExecutors().size());
        TableEventExecutor tableExecutor1 = (TableEventExecutor)dbExecutor1.getTableEventExecutors().get(0);
        TableEventExecutor tableExecutor2 = (TableEventExecutor)dbExecutor1.getTableEventExecutors().get(1);
        TableEventExecutor tableExecutor3 = (TableEventExecutor)dbExecutor2.getTableEventExecutors().get(0);
        TableEventExecutor tableExecutor4 = (TableEventExecutor)dbExecutor2.getTableEventExecutors().get(1);
        eventExecutorService.setStatus(EventExecutorService.EventExecutorStatus.ACTIVE);
        this.createDatabase(DB_NAME1);
        this.createDatabase(DB_NAME2);
        for (int i = 0; i < 10; ++i) {
            this.createTable(DB_NAME1, "t" + i);
            this.createTable(DB_NAME2, "t" + i);
        }
        List metastoreEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics());
        for (MetastoreEvents.MetastoreEvent event : metastoreEvents) {
            eventExecutorService.dispatch(event);
        }
        for (DbEventExecutor executor : dbEventExecutorList) {
            executor.process();
        }
        Assert.assertEquals((long)1L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)1L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)5L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)5L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)5L, (long)tableExecutor3.getTableCount());
        Assert.assertEquals((long)5L, (long)tableExecutor4.getTableCount());
        if (isShutDown) {
            eventExecutorService.shutdown(false);
        } else {
            eventExecutorService.clear();
        }
        Assert.assertEquals((long)0L, (long)dbExecutor1.getDbCount());
        Assert.assertEquals((long)0L, (long)dbExecutor2.getDbCount());
        Assert.assertEquals((long)0L, (long)dbExecutor1.getOutstandingEventCount());
        Assert.assertEquals((long)0L, (long)tableExecutor1.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor2.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor3.getTableCount());
        Assert.assertEquals((long)0L, (long)tableExecutor4.getTableCount());
        if (!isShutDown) {
            eventExecutorService.shutdown(true);
        }
    }

    @Test
    public void testClearExecutors() throws Exception {
        this.clearOrShutEventExecutorService(false);
    }

    @Test
    public void testForceShutdownExecutors() throws Exception {
        this.clearOrShutEventExecutorService(true);
    }

    private void renameTableTest(String srcDbName, String srcTableName, String targetDbName, String targetTableName) throws Exception {
        this.createDatabase(srcDbName);
        if (!srcDbName.equalsIgnoreCase(targetDbName)) {
            this.createDatabase(targetDbName);
        }
        this.createTable(srcDbName, srcTableName);
        eventsProcessor_.processEvents();
        Table table = catalog_.getOrLoadTable(srcDbName, srcTableName, "test", null);
        Assert.assertTrue((String)"Table should have been loaded.", (boolean)(table instanceof HdfsTable));
        this.alterTableRename(srcDbName, srcTableName, targetDbName, targetTableName);
        eventsProcessor_.processEvents();
        Table tableAfterRename = catalog_.getTable(targetDbName, targetTableName);
        Assert.assertTrue((String)"Table after rename should be incomplete.", (boolean)(tableAfterRename instanceof IncompleteTable));
    }

    private List<RenameTableBarrierEvent> renameTableAndGetBarrierEvents(String srcDbName, String srcTableName, String targetDbName, String targetTableName) throws Exception {
        this.createDatabase(srcDbName);
        if (!srcDbName.equalsIgnoreCase(targetDbName)) {
            this.createDatabase(targetDbName);
        }
        this.createTable(srcDbName, srcTableName);
        eventsProcessor_.processEvents();
        Table table = catalog_.getOrLoadTable(srcDbName, srcTableName, "test", null);
        Assert.assertTrue((String)"Table should have been loaded.", (boolean)(table instanceof HdfsTable));
        this.alterTableRename(srcDbName, srcTableName, targetDbName, targetTableName);
        List metastoreEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics());
        MetastoreEvents.AlterTableEvent alterTableEvent = null;
        for (MetastoreEvents.MetastoreEvent event : metastoreEvents) {
            if (!(event instanceof MetastoreEvents.AlterTableEvent) || !((MetastoreEvents.AlterTableEvent)event).isRename()) continue;
            alterTableEvent = (MetastoreEvents.AlterTableEvent)event;
            break;
        }
        Assert.assertNotNull(alterTableEvent);
        return eventsProcessor_.getEventExecutorService().getRenameTableBarrierEvents(alterTableEvent);
    }

    @Test
    public void testRenameTableWithinDBWithOneDBExecutorAndOneTableExecutor() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(1, 1);
        this.renameTableTest(DB_NAME1, "t1", DB_NAME1, "t2");
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testRenameTableAcrossDBWithOneDBExecutorAndOneTableExecutor() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(1, 1);
        this.renameTableTest(DB_NAME1, "t1", DB_NAME2, "t2");
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testRenameTableWithinDBWithOneDBExecutorAndTwoTableExecutor() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(1, 2);
        this.renameTableTest(DB_NAME1, "t1", DB_NAME1, "t2");
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testRenameTableAcrossDBWithOneDBExecutorAndTwoTableExecutor() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(1, 2);
        this.renameTableTest(DB_NAME1, "t1", DB_NAME2, "t2");
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testRenameTableAcrossDBWithTwoDBExecutorAndOneTableExecutor() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 1);
        this.renameTableTest(DB_NAME1, "t1", DB_NAME2, "t2");
        this.shutDownEventExecutorService(eventExecutorService);
    }

    private void alterDatabaseAddParameters(String dbName, String key, String val) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            MetastoreApiTestUtils.addDatabaseParametersInHms(msClient, dbName, key, val);
        }
    }

    private void alterDatabase(Database newDatabase) throws TException {
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            msClient.getHiveClient().alterDatabase(newDatabase.getName(), newDatabase);
        }
    }

    private void alterDatabaseTest(String dbName, String[] tableNames) throws Exception {
        this.createDatabase(dbName);
        eventsProcessor_.processEvents();
        for (String tableName : tableNames) {
            this.createTable(dbName, tableName);
        }
        String currentLocation = catalog_.getDb(dbName).getMetaStoreDb().getLocationUri();
        String newLocation = currentLocation + File.separatorChar + "newTestLocation";
        Database alteredDb = catalog_.getDb(dbName).getMetaStoreDb().deepCopy();
        String dbKey = "dbKey";
        String dbVal = "dbVal";
        alteredDb.putToParameters(dbKey, dbVal);
        alteredDb.setLocationUri(newLocation);
        this.alterDatabase(alteredDb);
        eventsProcessor_.processEvents();
        Database finalDb = catalog_.getDb(dbName).getMetaStoreDb();
        Assert.assertTrue((String)String.format("Altered database should have set key %s to value %s in parameters", dbKey, dbVal), (boolean)dbVal.equals(finalDb.getParameters().get(dbKey)));
        Assert.assertTrue((String)"Altered database should have the updated location", (boolean)newLocation.equals(finalDb.getLocationUri()));
    }

    @Test
    public void testAlterDatabase() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 2);
        this.alterDatabaseTest(DB_NAME1, new String[0]);
        this.alterDatabaseTest(DB_NAME2, new String[]{"t1", "t2", "t3"});
        this.shutDownEventExecutorService(eventExecutorService);
    }

    public void transactionTest(boolean abortTxn) throws Exception {
        this.createDatabase(DB_NAME1);
        this.createDatabase(DB_NAME2);
        String t1 = "t1";
        String t2 = "t2";
        String t3 = "t3";
        this.createTransactionalTable(DB_NAME1, t1, false);
        this.createTransactionalTable(DB_NAME1, t2, true);
        ArrayList<List<String>> t2PartVals = new ArrayList<List<String>>(2);
        t2PartVals.add(Arrays.asList("1"));
        t2PartVals.add(Arrays.asList("2"));
        this.addPartitions(DB_NAME1, t2, t2PartVals);
        this.createTransactionalTable(DB_NAME2, t3, true);
        ArrayList<List<String>> t3PartVals = new ArrayList<List<String>>(1);
        t3PartVals.add(Arrays.asList("3"));
        this.addPartitions(DB_NAME2, t3, t3PartVals);
        eventsProcessor_.processEvents();
        Table t1Table = catalog_.getOrLoadTable(DB_NAME1, t1, "test", null);
        Assert.assertTrue((boolean)(t1Table instanceof HdfsTable));
        Table t2Table = catalog_.getOrLoadTable(DB_NAME1, t2, "test", null);
        Assert.assertTrue((boolean)(t2Table instanceof HdfsTable));
        Table t3Table = catalog_.getOrLoadTable(DB_NAME2, t3, "test", null);
        Assert.assertTrue((boolean)(t3Table instanceof HdfsTable));
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            long txnId = MetastoreShim.openTransaction((IMetaStoreClient)msClient.getHiveClient());
            long writeId = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME1, (String)t1);
            this.insertIntoTransactionalTable(msClient, DB_NAME1, t1, null, txnId, writeId);
            MetastoreShim.commitTransaction((IMetaStoreClient)msClient.getHiveClient(), (long)txnId);
            eventsProcessor_.processEvents();
            ValidWriteIdList writeIdList = t1Table.getValidWriteIds();
            Assert.assertTrue((boolean)writeIdList.isWriteIdValid(writeId));
            txnId = MetastoreShim.openTransaction((IMetaStoreClient)msClient.getHiveClient());
            writeId = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME1, (String)t2);
            this.insertIntoTransactionalTable(msClient, DB_NAME1, t2, Arrays.asList("1"), txnId, writeId);
            this.insertIntoTransactionalTable(msClient, DB_NAME1, t2, Arrays.asList("2"), txnId, writeId);
            MetastoreShim.commitTransaction((IMetaStoreClient)msClient.getHiveClient(), (long)txnId);
            eventsProcessor_.processEvents();
            writeIdList = t2Table.getValidWriteIds();
            Assert.assertTrue((boolean)writeIdList.isWriteIdValid(writeId));
            txnId = MetastoreShim.openTransaction((IMetaStoreClient)msClient.getHiveClient());
            long writeId1 = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME1, (String)t1);
            this.insertIntoTransactionalTable(msClient, DB_NAME1, t1, null, txnId, writeId1);
            long writeId2 = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME1, (String)t2);
            this.insertIntoTransactionalTable(msClient, DB_NAME1, t2, Arrays.asList("2"), txnId, writeId2);
            long writeId3 = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME2, (String)t3);
            this.insertIntoTransactionalTable(msClient, DB_NAME2, t3, Arrays.asList("3"), txnId, writeId3);
            if (abortTxn) {
                MetastoreShim.abortTransaction((IMetaStoreClient)msClient.getHiveClient(), (long)txnId);
            } else {
                MetastoreShim.commitTransaction((IMetaStoreClient)msClient.getHiveClient(), (long)txnId);
            }
            eventsProcessor_.processEvents();
            ValidWriteIdList writeIdList1 = t1Table.getValidWriteIds();
            ValidWriteIdList writeIdList2 = t2Table.getValidWriteIds();
            ValidWriteIdList writeIdList3 = t3Table.getValidWriteIds();
            if (abortTxn) {
                Assert.assertTrue((boolean)writeIdList2.isWriteIdAborted(writeId2));
                Assert.assertTrue((boolean)writeIdList3.isWriteIdAborted(writeId3));
            } else {
                Assert.assertTrue((boolean)writeIdList1.isWriteIdValid(writeId1));
                Assert.assertTrue((boolean)writeIdList2.isWriteIdValid(writeId2));
                Assert.assertTrue((boolean)writeIdList3.isWriteIdValid(writeId3));
            }
        }
    }

    @Test
    public void testCommitTxn() throws Exception {
        Assume.assumeFalse((String)"Skipping this since COMMIT_TXN events are ignored on Apache Hive 2/3. So the validWriteIds list is not updated correctly.", (TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3 ? 1 : 0) != 0);
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 2);
        this.transactionTest(false);
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testAbortTxn() throws Exception {
        Assume.assumeFalse((String)"Skipping this since COMMIT_TXN events are ignored on Apache Hive 2/3. So the validWriteIds list is not updated correctly.", (TestUtils.isApacheHiveVersion() && TestUtils.getHiveMajorVersion() <= 3 ? 1 : 0) != 0);
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 2);
        this.transactionTest(true);
        this.shutDownEventExecutorService(eventExecutorService);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRenameEventState() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(2, 1);
        List<RenameTableBarrierEvent> barrierEvents = this.renameTableAndGetBarrierEvents(DB_NAME1, "t1", DB_NAME2, "t2");
        eventsProcessor_.processEvents();
        RenameTableBarrierEvent dropTableBarrierEvent = null;
        RenameTableBarrierEvent createTableBarrierEvent = null;
        for (RenameTableBarrierEvent event : barrierEvents) {
            if (event.getEventType() == MetastoreEvents.MetastoreEventType.DROP_TABLE) {
                dropTableBarrierEvent = event;
                continue;
            }
            if (event.getEventType() != MetastoreEvents.MetastoreEventType.CREATE_TABLE) continue;
            createTableBarrierEvent = event;
        }
        int sleep = 10;
        Assert.assertNotNull(createTableBarrierEvent);
        RenameTableBarrierEvent.RenameEventState state = createTableBarrierEvent.getState();
        RenameTableBarrierEvent finalCreateTableBarrierEvent = createTableBarrierEvent;
        Callable<Boolean> createTableCallable = () -> {
            Assert.assertFalse((boolean)state.isCreateProcessed());
            Assert.assertFalse((boolean)finalCreateTableBarrierEvent.canProcess());
            while (!state.isDropProcessed()) {
                Thread.sleep(sleep);
            }
            Assert.assertFalse((boolean)state.isCreateProcessed());
            Assert.assertTrue((boolean)finalCreateTableBarrierEvent.canProcess());
            state.setProcessed(MetastoreEvents.MetastoreEventType.CREATE_TABLE, false);
            Assert.assertTrue((boolean)state.isCreateProcessed());
            Assert.assertFalse((boolean)finalCreateTableBarrierEvent.canProcess());
            return true;
        };
        Assert.assertNotNull((Object)dropTableBarrierEvent);
        RenameTableBarrierEvent finalDropTableBarrierEvent = dropTableBarrierEvent;
        Callable<Boolean> dropTableCallable = () -> {
            Assert.assertFalse((boolean)state.isDropProcessed());
            int delayCount = 5;
            while (delayCount-- > 0) {
                Assert.assertTrue((boolean)finalDropTableBarrierEvent.canProcess());
                Thread.sleep(sleep);
            }
            Assert.assertFalse((boolean)state.isDropProcessed());
            Assert.assertTrue((boolean)finalDropTableBarrierEvent.canProcess());
            state.setProcessed(MetastoreEvents.MetastoreEventType.DROP_TABLE, false);
            Assert.assertTrue((boolean)state.isDropProcessed());
            Assert.assertFalse((boolean)finalDropTableBarrierEvent.canProcess());
            return true;
        };
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        try {
            Future<Boolean> createTableTask = executorService.submit(createTableCallable);
            Future<Boolean> dropTableTask = executorService.submit(dropTableCallable);
            Assert.assertTrue((dropTableTask.get() != false && createTableTask.get() != false ? 1 : 0) != 0);
        }
        finally {
            executorService.shutdown();
            this.shutDownEventExecutorService(eventExecutorService);
        }
    }

    @Test
    public void testEventSkippingWhenDropDatabaseQueuedBehind() throws Exception {
        EventExecutorService eventExecutorService = new EventExecutorService(eventsProcessor_, 2, 1);
        eventExecutorService.setStatus(EventExecutorService.EventExecutorStatus.ACTIVE);
        String t1 = "t1";
        this.createDatabase(DB_NAME1);
        this.createDatabase(DB_NAME2);
        this.createTable(DB_NAME1, t1);
        this.alterDatabaseAddParameters(DB_NAME1, "dbKey", "dbVal");
        this.alterTableRename(DB_NAME1, t1, DB_NAME2, "t2");
        EventExecutorServiceTest.dropDatabase(DB_NAME1);
        EventExecutorServiceTest.dropDatabase(DB_NAME2);
        Metrics metrics = eventsProcessor_.getMetrics();
        long prevEventsSkipped = metrics.getCounter("events-skipped").getCount();
        long dbsAddedMetric = metrics.getCounter("databases-added").getCount();
        long dbsRemovedMetric = metrics.getCounter("databases-removed").getCount();
        long tablesAddedMetric = metrics.getCounter("tables-added").getCount();
        long tablesRemovedMetric = metrics.getCounter("tables-removed").getCount();
        List metastoreEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics());
        Assert.assertTrue((metastoreEvents.size() > 0 ? 1 : 0) != 0);
        for (MetastoreEvents.MetastoreEvent event : metastoreEvents) {
            eventExecutorService.dispatch(event);
        }
        this.processEventsSynchronously(eventExecutorService);
        eventExecutorService.shutdown(true);
        Assert.assertEquals((long)(prevEventsSkipped + 8L), (long)metrics.getCounter("events-skipped").getCount());
        Assert.assertEquals((long)dbsAddedMetric, (long)metrics.getCounter("databases-added").getCount());
        Assert.assertEquals((long)dbsRemovedMetric, (long)metrics.getCounter("databases-removed").getCount());
        Assert.assertEquals((long)tablesAddedMetric, (long)metrics.getCounter("tables-added").getCount());
        Assert.assertEquals((long)tablesRemovedMetric, (long)metrics.getCounter("tables-removed").getCount());
    }

    @Test
    public void testGreatestSyncedEvent() throws Exception {
        EventExecutorService eventExecutorService = this.createEventExecutorService(1, 1);
        Assert.assertEquals((long)0L, (long)eventExecutorService.getOutstandingEventCount());
        long lastSyncedEventId = eventsProcessor_.getLastSyncedEventId();
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)eventsProcessor_.getLastSyncedEventTime(), (long)eventExecutorService.getGreatestSyncedEventTime());
        this.createDatabase(DB_NAME1);
        this.createTable(DB_NAME1, "t1");
        eventsProcessor_.processEvents();
        Table table = catalog_.getOrLoadTable(DB_NAME1, "t1", "test", null);
        Assert.assertTrue((String)"Table should have been loaded.", (boolean)(table instanceof HdfsTable));
        Assert.assertEquals((long)(lastSyncedEventId + 2L), (long)eventsProcessor_.getLastSyncedEventId());
        Assert.assertEquals((long)eventsProcessor_.getLastSyncedEventId(), (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)eventsProcessor_.getLastSyncedEventTime(), (long)eventExecutorService.getGreatestSyncedEventTime());
        this.shutDownEventExecutorService(eventExecutorService);
    }

    @Test
    public void testEventLogs() throws Exception {
        EventExecutorService eventExecutorService = new EventExecutorService(eventsProcessor_, 2, 1);
        eventExecutorService.setStatus(EventExecutorService.EventExecutorStatus.ACTIVE);
        long lastSyncedEventId = eventsProcessor_.getLastSyncedEventId();
        this.createDatabase(DB_NAME1);
        String t1 = "t1";
        String t2 = "t2";
        this.createTransactionalTable(DB_NAME1, t1, false);
        try (MetaStoreClientPool.MetaStoreClient msClient = catalog_.getMetaStoreClient();){
            long txnId = MetastoreShim.openTransaction((IMetaStoreClient)msClient.getHiveClient());
            long writeId = MetastoreShim.allocateTableWriteId((IMetaStoreClient)msClient.getHiveClient(), (long)txnId, (String)DB_NAME1, (String)t1);
            MetastoreShim.commitTransaction((IMetaStoreClient)msClient.getHiveClient(), (long)txnId);
        }
        this.createDatabase(DB_NAME2);
        this.createTable(DB_NAME2, t2);
        List metastoreEvents = eventsProcessor_.getEventsFactory().getFilteredEvents(eventsProcessor_.getNextMetastoreEvents(), eventsProcessor_.getMetrics());
        Assert.assertEquals((long)6L, (long)metastoreEvents.size());
        Assert.assertEquals((long)0L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getProcessedLog().size());
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)0L, (long)eventExecutorService.getPendingEventCount(lastSyncedEventId));
        long lastEventId = ((MetastoreEvents.MetastoreEvent)metastoreEvents.get(5)).getEventId();
        Assert.assertEquals((long)7L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        for (MetastoreEvents.MetastoreEvent event : metastoreEvents) {
            eventExecutorService.dispatch(event);
        }
        Assert.assertEquals((long)6L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getProcessedLog().size());
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)6L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        Assert.assertEquals((long)106L, (long)eventExecutorService.getPendingEventCount(lastEventId + 100L));
        eventExecutorService.removeFromInProgressLog(((MetastoreEvents.MetastoreEvent)metastoreEvents.get(4)).getEventId());
        Assert.assertEquals((long)5L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)2L, (long)eventExecutorService.getProcessedLog().size());
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)5L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        Assert.assertEquals((long)4L, (long)eventExecutorService.getPendingEventCount(((MetastoreEvents.MetastoreEvent)metastoreEvents.get(4)).getEventId()));
        Assert.assertEquals((long)105L, (long)eventExecutorService.getPendingEventCount(lastEventId + 100L));
        eventExecutorService.removeFromInProgressLog(((MetastoreEvents.MetastoreEvent)metastoreEvents.get(0)).getEventId());
        Assert.assertEquals((long)4L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)2L, (long)eventExecutorService.getProcessedLog().size());
        lastSyncedEventId = ((MetastoreEvents.MetastoreEvent)metastoreEvents.get(0)).getEventId();
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)4L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        eventExecutorService.removeFromInProgressLog(((MetastoreEvents.MetastoreEvent)metastoreEvents.get(1)).getEventId());
        Assert.assertEquals((long)3L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)2L, (long)eventExecutorService.getProcessedLog().size());
        lastSyncedEventId = ((MetastoreEvents.MetastoreEvent)metastoreEvents.get(1)).getEventId();
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)3L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        eventExecutorService.removeFromInProgressLog(((MetastoreEvents.MetastoreEvent)metastoreEvents.get(2)).getEventId());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getProcessedLog().size());
        lastSyncedEventId = ((MetastoreEvents.MetastoreEvent)metastoreEvents.get(4)).getEventId();
        Assert.assertEquals((long)lastSyncedEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getPendingEventCount(lastEventId));
        eventExecutorService.removeFromInProgressLog(lastEventId);
        Assert.assertEquals((long)0L, (long)eventExecutorService.getInProgressLog().size());
        Assert.assertEquals((long)1L, (long)eventExecutorService.getProcessedLog().size());
        Assert.assertEquals((long)lastEventId, (long)eventExecutorService.getGreatestSyncedEventId());
        Assert.assertEquals((long)0L, (long)eventExecutorService.getPendingEventCount(lastEventId));
    }
}

