/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.llap.cache;

import com.google.common.annotations.VisibleForTesting;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.io.CacheTag;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.ProactiveEviction;
import org.apache.hadoop.hive.llap.cache.EvictionListener;
import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer;
import org.apache.hadoop.hive.llap.cache.LlapDataBuffer;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheImpl;
import org.apache.hadoop.hive.llap.cache.LowLevelCacheMemoryManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCachePolicy;
import org.apache.hadoop.hive.llap.cache.ProactiveEvictingCachePolicy;
import org.apache.hadoop.hive.llap.cache.TestCacheContentsTracker;
import org.apache.hadoop.hive.llap.cache.TestLowLevelLrfuCachePolicy;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.metrics.LlapDaemonCacheMetrics;
import org.junit.Assert;
import org.junit.Test;

public class TestProactiveEviction {
    private static final CacheTag[] TEST_TAGS = new CacheTag[]{TestCacheContentsTracker.cacheTagBuilder("fx.rates", "from=USD", "to=HUF"), TestCacheContentsTracker.cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), TestCacheContentsTracker.cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), TestCacheContentsTracker.cacheTagBuilder("fx.rates", "from=USD", "to=EUR"), TestCacheContentsTracker.cacheTagBuilder("fx.rates", "from=EUR", "to=HUF"), TestCacheContentsTracker.cacheTagBuilder("fx.futures", "ccy=EUR"), TestCacheContentsTracker.cacheTagBuilder("fx.futures", "ccy=JPY"), TestCacheContentsTracker.cacheTagBuilder("fx.futures", "ccy=JPY"), TestCacheContentsTracker.cacheTagBuilder("fx.futures", "ccy=USD"), TestCacheContentsTracker.cacheTagBuilder("fx.centralbanks", new String[0]), TestCacheContentsTracker.cacheTagBuilder("fx.centralbanks", new String[0]), TestCacheContentsTracker.cacheTagBuilder("fx.centralbanks", new String[0]), TestCacheContentsTracker.cacheTagBuilder("equity.prices", "ex=NYSE"), TestCacheContentsTracker.cacheTagBuilder("equity.prices", "ex=NYSE"), TestCacheContentsTracker.cacheTagBuilder("equity.prices", "ex=NASDAQ"), TestCacheContentsTracker.cacheTagBuilder("fixedincome.bonds", new String[0]), TestCacheContentsTracker.cacheTagBuilder("fixedincome.bonds", new String[0]), TestCacheContentsTracker.cacheTagBuilder("fixedincome.yieldcurves", new String[0])};

    @Test
    public void testCachetagAndRequestMatching() throws Exception {
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addDb("fx"), "111111111111000000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addTable("fx", "futures"), "000001111000000000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addPartitionOfATable("fx", "futures", TestProactiveEviction.buildParts("ccy", "JPY")), "000000110000000000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addPartitionOfATable("equity", "prices", TestProactiveEviction.buildParts("ex", "NYSE")).addPartitionOfATable("equity", "prices", TestProactiveEviction.buildParts("ex", "NYSE")), "000000000000110000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addTable("fx", "rates").addTable("fx", "futures"), "111111111000000000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addPartitionOfATable("fx", "rates", TestProactiveEviction.buildParts("from", "PLN")), "000000000000000000");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addTable("fixedincome", "bonds"), "000000000000000110");
        TestProactiveEviction.assertMatchOnTags(ProactiveEviction.Request.Builder.create().addPartitionOfATable("fx", "rates", TestProactiveEviction.buildParts("from", "EUR", "to", "HUF")), "000010000000000000");
    }

    private static LinkedHashMap buildParts(String ... vals) {
        LinkedHashMap<String, String> ret = new LinkedHashMap<String, String>();
        for (int i = 0; i < vals.length; i += 2) {
            ret.put(vals[i], vals[i + 1]);
        }
        return ret;
    }

    private static void assertMatchOnTags(ProactiveEviction.Request.Builder requestBuilder, String expected) {
        assert (expected.length() == TEST_TAGS.length);
        ProactiveEviction.Request request = ProactiveEviction.Request.Builder.create().fromProtoRequest((LlapDaemonProtocolProtos.EvictEntityRequestProto)requestBuilder.build().toProtoRequests().get(0)).build();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < TEST_TAGS.length; ++i) {
            sb.append(request.isTagMatch(TEST_TAGS[i]) ? (char)'1' : '0');
        }
        Assert.assertEquals((Object)expected, (Object)sb.toString());
    }

    @Test
    public void testProactiveSweep() throws Exception {
        TestProactiveEviction.closeSweeperExecutorForTest();
        HiveConf conf = new HiveConf();
        conf.setBoolVar(HiveConf.ConfVars.LLAP_IO_PROACTIVE_EVICTION_ENABLED, false);
        new DummyPolicy((Configuration)conf);
        Assert.assertFalse((boolean)TestProactiveEviction.isProactiveEvictionSweeperThreadStarted());
        conf = new HiveConf();
        long sweepIntervalInMs = 200L;
        conf.setTimeVar(HiveConf.ConfVars.LLAP_IO_PROACTIVE_EVICTION_SWEEP_INTERVAL, sweepIntervalInMs, TimeUnit.MILLISECONDS);
        TestLowLevelLrfuCachePolicy.EvictionTracker evictionListener = new TestLowLevelLrfuCachePolicy.EvictionTracker();
        DummyPolicy policy = new DummyPolicy((Configuration)conf);
        policy.setEvictionListener(evictionListener);
        LowLevelCacheMemoryManager mm = new LowLevelCacheMemoryManager(1024L, (LowLevelCachePolicy)policy, LlapDaemonCacheMetrics.create((String)"test", (String)"1"));
        Assert.assertTrue((boolean)TestProactiveEviction.isProactiveEvictionSweeperThreadStarted());
        LlapDataBuffer[] buffs = (LlapDataBuffer[])IntStream.range(0, 10).mapToObj(i -> LowLevelCacheImpl.allocateFake()).toArray(LlapDataBuffer[]::new);
        Arrays.stream(buffs).forEach(b -> policy.cache((LlapCacheableBuffer)b, null));
        buffs[0].markForEviction();
        mm.notifyProactiveEvictionMark();
        buffs[1].markForEviction();
        mm.notifyProactiveEvictionMark();
        IntStream.range(0, 10).forEach(i -> TestProactiveEviction.assertBufferEvicted(false, false, buffs[i], evictionListener));
        Thread.sleep(sweepIntervalInMs * 2L);
        IntStream.range(0, 2).forEach(i -> TestProactiveEviction.assertBufferEvicted(true, true, buffs[i], evictionListener));
        IntStream.range(2, 10).forEach(i -> TestProactiveEviction.assertBufferEvicted(false, false, buffs[i], evictionListener));
        IntStream.range(5, 10).forEach(i -> buffs[i].markForEviction());
        mm.notifyProactiveEvictionMark();
        Thread.sleep(sweepIntervalInMs * 2L);
        IntStream.range(0, 2).forEach(i -> TestProactiveEviction.assertBufferEvicted(true, true, buffs[i], evictionListener));
        IntStream.range(2, 5).forEach(i -> TestProactiveEviction.assertBufferEvicted(false, false, buffs[i], evictionListener));
        IntStream.range(5, 10).forEach(i -> TestProactiveEviction.assertBufferEvicted(true, true, buffs[i], evictionListener));
        Assert.assertEquals((long)3L, (long)policy.purge());
        IntStream.range(2, 5).forEach(i -> TestProactiveEviction.assertBufferEvicted(true, false, buffs[i], evictionListener));
        Assert.assertEquals((long)2L, (long)policy.proactiveEvictionSweepCount);
    }

    public static void closeSweeperExecutorForTest() throws Exception {
        ScheduledExecutorService service = TestProactiveEviction.retrieveSweeperExecutor();
        if (service != null) {
            service.shutdownNow();
        }
    }

    private static boolean isProactiveEvictionSweeperThreadStarted() throws Exception {
        ScheduledExecutorService service = TestProactiveEviction.retrieveSweeperExecutor();
        if (service == null) {
            return false;
        }
        return !service.isShutdown();
    }

    private static ScheduledExecutorService retrieveSweeperExecutor() throws Exception {
        Field sweeperExecutorField = ProactiveEvictingCachePolicy.Impl.class.getDeclaredField("PROACTIVE_EVICTION_SWEEPER_EXECUTOR");
        sweeperExecutorField.setAccessible(true);
        return (ScheduledExecutorService)sweeperExecutorField.get(null);
    }

    private static void assertBufferEvicted(boolean expectingEvicted, boolean wasProactive, LlapDataBuffer buffer, TestLowLevelLrfuCachePolicy.EvictionTracker evictionListener) {
        Assert.assertEquals((Object)expectingEvicted, (Object)buffer.isInvalid());
        Assert.assertEquals((Object)(expectingEvicted && !wasProactive ? 1 : 0), (Object)evictionListener.evicted.contains(buffer));
        Assert.assertEquals((Object)(expectingEvicted && wasProactive ? 1 : 0), (Object)evictionListener.proactivelyEvicted.contains(buffer));
    }

    class DummyPolicy
    extends ProactiveEvictingCachePolicy.Impl
    implements LowLevelCachePolicy {
        EvictionListener evictionListener;
        private Set<LlapCacheableBuffer> buffers;
        @VisibleForTesting
        public int proactiveEvictionSweepCount;

        protected DummyPolicy(Configuration conf) {
            super(conf);
            this.evictionListener = null;
            this.buffers = new HashSet<LlapCacheableBuffer>();
            this.proactiveEvictionSweepCount = 0;
        }

        public void cache(LlapCacheableBuffer buffer, LowLevelCache.Priority priority) {
            this.buffers.add(buffer);
        }

        public void notifyLock(LlapCacheableBuffer buffer) {
        }

        public void notifyUnlock(LlapCacheableBuffer buffer) {
        }

        public long evictSomeBlocks(long memoryToReserve) {
            return 0L;
        }

        public void setEvictionListener(EvictionListener listener) {
            this.evictionListener = listener;
        }

        public long purge() {
            return this.evictOrPurge(true);
        }

        public void debugDumpShort(StringBuilder sb) {
        }

        public void evictProactively() {
            ++this.proactiveEvictionSweepCount;
            this.evictOrPurge(false);
        }

        private long evictOrPurge(boolean isPurge) {
            long evictedBytes = 0L;
            Iterator<LlapCacheableBuffer> it = this.buffers.iterator();
            while (it.hasNext()) {
                LlapCacheableBuffer buffer = it.next();
                if (!isPurge && (isPurge || !buffer.isMarkedForEviction()) || 0 != buffer.invalidate()) continue;
                evictedBytes += buffer.getMemoryUsage();
                if (!isPurge) {
                    this.evictionListener.notifyProactivelyEvicted(buffer);
                } else {
                    this.evictionListener.notifyEvicted(buffer);
                }
                it.remove();
            }
            return evictedBytes;
        }
    }
}

