/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.hbase.index.write;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.phoenix.end2end.ServerMetadataCacheTestImpl;
import org.apache.phoenix.hbase.index.IndexTableName;
import org.apache.phoenix.hbase.index.IndexTestingUtils;
import org.apache.phoenix.hbase.index.covered.ColumnGroup;
import org.apache.phoenix.hbase.index.covered.CoveredColumn;
import org.apache.phoenix.hbase.index.covered.CoveredColumnIndexSpecifierBuilder;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.hbase.index.util.TestIndexManagementUtil;
import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
import org.apache.phoenix.query.BaseTest;
import org.apache.phoenix.thirdparty.com.google.common.collect.Multimap;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestWALRecoveryCaching {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestWALRecoveryCaching.class);
    private static final long ONE_SEC = 1000L;
    private static final long ONE_MIN = 60000L;
    private static final long TIMEOUT = 60000L;
    @Rule
    public IndexTableName testTable = new IndexTableName();
    private static CountDownLatch allowIndexTableToRecover;

    private String getIndexTableName() {
        return this.testTable.getTableNameString() + "_index";
    }

    @Ignore(value="Configuration issue - valid test, just needs fixing")
    @Test
    public void testWaitsOnIndexRegionToReload() throws Exception {
        HBaseTestingUtility util = new HBaseTestingUtility();
        Configuration conf = util.getConfiguration();
        BaseTest.setUpConfigForMiniCluster(conf);
        IndexTestingUtils.setupConfig(conf);
        conf.setBoolean("com.saleforce.hbase.index.checkversion", false);
        IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured((Configuration)conf);
        util.startMiniCluster(2);
        HBaseAdmin admin = util.getHBaseAdmin();
        byte[] family = Bytes.toBytes((String)"family");
        byte[] qual = Bytes.toBytes((String)"qualifier");
        byte[] nonIndexedFamily = Bytes.toBytes((String)"nonIndexedFamily");
        String indexedTableName = this.getIndexTableName();
        ColumnGroup columns = new ColumnGroup(indexedTableName);
        columns.add(new CoveredColumn(family, qual));
        CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
        builder.addIndexGroup(columns);
        TableDescriptor primaryTable = TableDescriptorBuilder.newBuilder((TableName)TableName.valueOf((byte[])this.testTable.getTableName())).addColumnFamily(ColumnFamilyDescriptorBuilder.of((byte[])family)).addColumnFamily(ColumnFamilyDescriptorBuilder.of((byte[])nonIndexedFamily)).build();
        builder.addArbitraryConfigForTesting("org.apache.hadoop.hbase.index.recovery.failurepolicy", ReleaseLatchOnFailurePolicy.class.getName());
        builder.build(primaryTable);
        admin.createTable(primaryTable);
        TableDescriptorBuilder indexTableBuilder = TableDescriptorBuilder.newBuilder((TableName)TableName.valueOf((byte[])Bytes.toBytes((String)this.getIndexTableName()))).addCoprocessor(IndexTableBlockingReplayObserver.class.getName());
        TestIndexManagementUtil.createIndexTable((Admin)admin, indexTableBuilder);
        ServerName shared = this.ensureTablesLiveOnSameServer(util.getMiniHBaseCluster(), Bytes.toBytes((String)indexedTableName), this.testTable.getTableName());
        Put p = new Put(Bytes.toBytes((String)"row"));
        p.addColumn(family, qual, Bytes.toBytes((String)"value"));
        Connection hbaseConn = ConnectionFactory.createConnection((Configuration)conf);
        Table primary = hbaseConn.getTable(TableName.valueOf((byte[])this.testTable.getTableName()));
        primary.put(p);
        allowIndexTableToRecover = new CountDownLatch(1);
        ArrayList<HRegion> online = new ArrayList<HRegion>();
        online.addAll(this.getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared, this.testTable.getTableName()));
        online.addAll(this.getRegionsFromServerForTable(util.getMiniHBaseCluster(), shared, Bytes.toBytes((String)indexedTableName)));
        LOGGER.info("Current Server/Region paring: ");
        for (JVMClusterUtil.RegionServerThread t : util.getMiniHBaseCluster().getRegionServerThreads()) {
            HRegionServer server = t.getRegionServer();
            if (server.isStopping() || server.isStopped() || server.isAborted()) {
                LOGGER.info("\t== Offline: " + server.getServerName());
                continue;
            }
            List regions = server.getRegions();
            LOGGER.info("\t" + server.getServerName() + " regions: " + regions);
        }
        LOGGER.debug("Killing server " + shared);
        util.getMiniHBaseCluster().killRegionServer(shared);
        LOGGER.debug("Waiting on server " + shared + "to die");
        util.getMiniHBaseCluster().waitForRegionServerToStop(shared, 60000L);
        System.out.println(" ====== Killed shared server ==== ");
        Put p2 = new Put(p.getRow());
        p2.addColumn(nonIndexedFamily, Bytes.toBytes((String)"Not indexed"), Bytes.toBytes((String)"non-indexed value"));
        primary.put(p2);
        Assert.assertTrue((String)"Didn't find an error writing to index table within timeout!", (boolean)allowIndexTableToRecover.await(300000L, TimeUnit.MILLISECONDS));
        Scan s = new Scan();
        Table index = hbaseConn.getTable(TableName.valueOf((String)this.getIndexTableName()));
        ResultScanner scanner = index.getScanner(s);
        int count = 0;
        for (Result r : scanner) {
            LOGGER.info("Got index table result:" + r);
            ++count;
        }
        Assert.assertEquals((String)"Got an unexpected found of index rows", (long)1L, (long)count);
        scanner.close();
        index.close();
        primary.close();
        ServerMetadataCacheTestImpl.resetCache();
        util.shutdownMiniCluster();
    }

    private List<HRegion> getRegionsFromServerForTable(MiniHBaseCluster cluster, ServerName server, byte[] table) {
        List online = Collections.emptyList();
        for (JVMClusterUtil.RegionServerThread rst : cluster.getRegionServerThreads()) {
            if (!rst.getRegionServer().getServerName().equals((Object)server)) continue;
            online = rst.getRegionServer().getRegions(TableName.valueOf((byte[])table));
            break;
        }
        return online;
    }

    private ServerName ensureTablesLiveOnSameServer(MiniHBaseCluster cluster, byte[] indexTable, byte[] primaryTable) throws Exception {
        ServerName shared = this.getSharedServer(cluster, indexTable, primaryTable);
        boolean tryIndex = true;
        while (shared == null) {
            byte[] table = null;
            table = tryIndex ? indexTable : primaryTable;
            Set<ServerName> servers = this.getServersForTable(cluster, table);
            tryIndex = !tryIndex;
            Iterator<ServerName> iterator = servers.iterator();
            if (iterator.hasNext()) {
                ServerName server = iterator.next();
                List<HRegion> online = this.getRegionsFromServerForTable(cluster, server, table);
                LOGGER.info("Shutting down and reassigning regions from " + server);
                cluster.stopRegionServer(server);
                cluster.waitForRegionServerToStop(server, 60000L);
                for (Region region : online) {
                    cluster.getMaster().getAssignmentManager().assign(region.getRegionInfo());
                }
                LOGGER.info("Starting region server:" + server.getHostname());
                cluster.startRegionServer(server.getHostname(), server.getPort());
                cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), 60000L);
                LOGGER.info("STarting server to replace " + server);
                cluster.startRegionServer();
            }
            shared = this.getSharedServer(cluster, indexTable, primaryTable);
        }
        return shared;
    }

    private ServerName getSharedServer(MiniHBaseCluster cluster, byte[] indexTable, byte[] primaryTable) throws Exception {
        Set<ServerName> indexServers = this.getServersForTable(cluster, indexTable);
        Set<ServerName> primaryServers = this.getServersForTable(cluster, primaryTable);
        HashSet<ServerName> joinSet = new HashSet<ServerName>(indexServers);
        joinSet.addAll(primaryServers);
        if (joinSet.size() < indexServers.size() + primaryServers.size()) {
            for (ServerName server : joinSet) {
                if (!indexServers.contains(server) || !primaryServers.contains(server)) continue;
                return server;
            }
            throw new RuntimeException("Couldn't find a matching server on which both the primary and index table live, even though they have overlapping server sets");
        }
        return null;
    }

    private Set<ServerName> getServersForTable(MiniHBaseCluster cluster, byte[] table) throws Exception {
        HashSet<ServerName> indexServers = new HashSet<ServerName>();
        for (Region region : cluster.getRegions(table)) {
            indexServers.add(cluster.getServerHoldingRegion(null, region.getRegionInfo().getRegionName()));
        }
        return indexServers;
    }

    public static class ReleaseLatchOnFailurePolicy
    extends StoreFailuresInCachePolicy {
        public ReleaseLatchOnFailurePolicy(PerRegionIndexWriteCache failedIndexEdits) {
            super(failedIndexEdits);
        }

        public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
            LOGGER.debug("Found index update failure!");
            if (allowIndexTableToRecover != null) {
                LOGGER.info("failed index write on WAL recovery - allowing index table to be restored.");
                allowIndexTableToRecover.countDown();
            }
            super.handleFailure(attempted, cause);
        }
    }

    public static class IndexTableBlockingReplayObserver
    implements RegionObserver,
    RegionCoprocessor {
        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preWALRestore(ObserverContext<? extends RegionCoprocessorEnvironment> ctx, RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
            try {
                LOGGER.debug("Restoring logs for index table");
                if (allowIndexTableToRecover != null) {
                    allowIndexTableToRecover.await();
                    LOGGER.debug("Completed index table recovery wait latch");
                }
            }
            catch (InterruptedException e) {
                Assert.fail((String)"Should not be interrupted while waiting to allow the index to restore WALs.");
            }
        }
    }
}

