/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.cache.aggcache;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillManager;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.GroupByCache;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpillableGroupByCache
implements GroupByCache {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpillableGroupByCache.class);
    private static final int SPGBY_CACHE_MIN_SIZE = 4096;
    private final LinkedHashMap<ImmutableBytesWritable, Aggregator[]> cache;
    private final ConcurrentMap<ImmutableBytesWritable, ImmutableBytesWritable> aggregateValueToLastScannedRowKeys;
    private final boolean isIncompatibleClient;
    private SpillManager spillManager = null;
    private long totalNumElements;
    private final ServerAggregators aggregators;
    private final RegionCoprocessorEnvironment env;
    private final MemoryManager.MemoryChunk chunk;

    public SpillableGroupByCache(final RegionCoprocessorEnvironment env, ImmutableBytesPtr tenantId, ServerAggregators aggs, int estSizeNum, boolean isIncompatibleClient) {
        this.isIncompatibleClient = isIncompatibleClient;
        this.totalNumElements = 0L;
        this.aggregators = aggs;
        this.env = env;
        final int estValueSize = this.aggregators.getEstimatedByteSize();
        TenantCache tenantCache = GlobalCache.getTenantCache(env, tenantId);
        Configuration conf = env.getConfiguration();
        long maxCacheSizeConf = conf.getLongBytes("phoenix.groupby.maxCacheSize", 0x6400000L);
        final int numSpillFilesConf = conf.getInt("phoenix.groupby.spillFiles", 2);
        int maxSizeNum = (int)(maxCacheSizeConf / (long)estValueSize);
        int minSizeNum = 4096 / estValueSize;
        final int maxCacheSize = Math.max(minSizeNum, Math.min(maxSizeNum, estSizeNum));
        long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(maxCacheSize, estValueSize);
        try {
            this.chunk = tenantCache.getMemoryManager().allocate(estSize);
        }
        catch (InsufficientMemoryException ime) {
            LOGGER.error("Requested Map size exceeds memory limit, please decrease max size via config paramter: phoenix.groupby.maxCacheSize");
            throw ime;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Instantiating LRU groupby cache of element size: " + maxCacheSize);
        }
        this.aggregateValueToLastScannedRowKeys = Maps.newConcurrentMap();
        this.cache = new LinkedHashMap<ImmutableBytesWritable, Aggregator[]>(maxCacheSize, 0.75f, true){
            boolean spill;
            int cacheSize;
            {
                super(x0, x1, x2);
                this.spill = false;
                this.cacheSize = maxCacheSize;
            }

            @Override
            protected boolean removeEldestEntry(Map.Entry<ImmutableBytesWritable, Aggregator[]> eldest) {
                if (!this.spill && this.size() > this.cacheSize) {
                    this.cacheSize = (int)((float)this.cacheSize * 1.5f);
                    long estSize = GroupedAggregateRegionObserver.sizeOfUnorderedGroupByMap(this.cacheSize, estValueSize);
                    try {
                        SpillableGroupByCache.this.chunk.resize(estSize);
                    }
                    catch (InsufficientMemoryException im) {
                        this.spill = true;
                    }
                }
                if (this.spill) {
                    try {
                        if (SpillableGroupByCache.this.spillManager == null) {
                            SpillableGroupByCache.this.spillManager = new SpillManager(numSpillFilesConf, SpillableGroupByCache.this.aggregators, env.getConfiguration(), new QueryCache());
                        }
                        SpillableGroupByCache.this.spillManager.spill(eldest.getKey(), eldest.getValue());
                    }
                    catch (IOException ioe) {
                        try {
                            throw new RuntimeException(ioe);
                        }
                        catch (Throwable throwable) {
                            Closeables.closeQuietly((Closeable)SpillableGroupByCache.this);
                            throw throwable;
                        }
                    }
                    return true;
                }
                return false;
            }
        };
    }

    @Override
    public long size() {
        return this.totalNumElements;
    }

    @Override
    public Aggregator[] cache(ImmutableBytesPtr cacheKey) {
        ImmutableBytesPtr key = new ImmutableBytesPtr(cacheKey);
        Aggregator[] rowAggregators = this.cache.get(key);
        if (rowAggregators == null) {
            if (this.spillManager != null) {
                try {
                    rowAggregators = this.spillManager.loadEntry((ImmutableBytesWritable)key);
                }
                catch (IOException ioe) {
                    try {
                        throw new RuntimeException(ioe);
                    }
                    catch (Throwable throwable) {
                        Closeables.closeQuietly((Closeable)this);
                        throw throwable;
                    }
                }
            }
            if (rowAggregators == null) {
                rowAggregators = this.aggregators.newAggregators(this.env.getConfiguration());
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Adding new aggregate bucket for row key " + Bytes.toStringBinary((byte[])key.get(), (int)key.getOffset(), (int)key.getLength()));
                }
            }
            if (this.cache.put((ImmutableBytesWritable)key, rowAggregators) == null) {
                ++this.totalNumElements;
            }
        }
        return rowAggregators;
    }

    @Override
    public void close() throws IOException {
        Closeables.closeQuietly((Closeable)this.spillManager);
        Closeables.closeQuietly((Closeable)this.chunk);
    }

    @Override
    public RegionScanner getScanner(final RegionScanner s) {
        final EntryIterator cacheIter = new EntryIterator();
        return new BaseRegionScanner(s){

            @Override
            public void close() throws IOException {
                try {
                    s.close();
                }
                finally {
                    Closeables.closeQuietly((Closeable)SpillableGroupByCache.this);
                }
            }

            @Override
            public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
                return this.next(result);
            }

            @Override
            public boolean next(List<Cell> results) throws IOException {
                if (!cacheIter.hasNext()) {
                    return false;
                }
                Map.Entry ce = (Map.Entry)cacheIter.next();
                ImmutableBytesWritable aggregateGroupValPtr = (ImmutableBytesWritable)ce.getKey();
                Object[] aggs = (Aggregator[])ce.getValue();
                byte[] aggregateArrayBytes = SpillableGroupByCache.this.aggregators.toBytes((Aggregator[])aggs);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Adding new distinct group: " + Bytes.toStringBinary((byte[])aggregateGroupValPtr.get(), (int)aggregateGroupValPtr.getOffset(), (int)aggregateGroupValPtr.getLength()) + " with aggregators " + Arrays.toString(aggs) + " value = " + Bytes.toStringBinary((byte[])aggregateArrayBytes));
                }
                if (!SpillableGroupByCache.this.isIncompatibleClient) {
                    ImmutableBytesWritable lastScannedRowKey = (ImmutableBytesWritable)SpillableGroupByCache.this.aggregateValueToLastScannedRowKeys.get(aggregateGroupValPtr);
                    byte[] aggregateGroupValueBytes = new byte[aggregateGroupValPtr.getLength()];
                    System.arraycopy(aggregateGroupValPtr.get(), aggregateGroupValPtr.getOffset(), aggregateGroupValueBytes, 0, aggregateGroupValueBytes.length);
                    byte[] finalValue = ByteUtil.concat((byte[])PInteger.INSTANCE.toBytes((Object)aggregateGroupValueBytes.length), (byte[][])new byte[][]{aggregateGroupValueBytes, aggregateArrayBytes});
                    results.add(PhoenixKeyValueUtil.newKeyValue((byte[])lastScannedRowKey.get(), (int)lastScannedRowKey.getOffset(), (int)lastScannedRowKey.getLength(), (byte[])QueryConstants.GROUPED_AGGREGATOR_VALUE_BYTES, (byte[])QueryConstants.GROUPED_AGGREGATOR_VALUE_BYTES, (long)Long.MAX_VALUE, (byte[])finalValue, (int)0, (int)finalValue.length));
                } else {
                    results.add(PhoenixKeyValueUtil.newKeyValue((byte[])aggregateGroupValPtr.get(), (int)aggregateGroupValPtr.getOffset(), (int)aggregateGroupValPtr.getLength(), (byte[])QueryConstants.SINGLE_COLUMN_FAMILY, (byte[])QueryConstants.SINGLE_COLUMN, (long)Long.MAX_VALUE, (byte[])aggregateArrayBytes, (int)0, (int)aggregateArrayBytes.length));
                }
                return cacheIter.hasNext();
            }
        };
    }

    @Override
    public void cacheAggregateRowKey(ImmutableBytesPtr value, ImmutableBytesPtr rowKey) {
        this.aggregateValueToLastScannedRowKeys.put((ImmutableBytesWritable)value, (ImmutableBytesWritable)rowKey);
    }

    private final class EntryIterator
    implements Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> {
        final Iterator<Map.Entry<ImmutableBytesWritable, Aggregator[]>> cacheIter;
        final Iterator<byte[]> spilledCacheIter;

        private EntryIterator() {
            this.cacheIter = SpillableGroupByCache.this.cache.entrySet().iterator();
            this.spilledCacheIter = SpillableGroupByCache.this.spillManager != null ? SpillableGroupByCache.this.spillManager.newDataIterator() : null;
        }

        @Override
        public boolean hasNext() {
            return this.cacheIter.hasNext();
        }

        @Override
        public Map.Entry<ImmutableBytesWritable, Aggregator[]> next() {
            if (this.spilledCacheIter != null && this.spilledCacheIter.hasNext()) {
                try {
                    byte[] value = this.spilledCacheIter.next();
                    SpillManager.CacheEntry<Object> spilledEntry = SpillableGroupByCache.this.spillManager.toCacheEntry(value);
                    boolean notFound = false;
                    while (SpillableGroupByCache.this.cache.containsKey(spilledEntry.getKey())) {
                        if (this.spilledCacheIter.hasNext()) {
                            value = this.spilledCacheIter.next();
                            spilledEntry = SpillableGroupByCache.this.spillManager.toCacheEntry(value);
                            continue;
                        }
                        notFound = true;
                        break;
                    }
                    if (!notFound) {
                        return spilledEntry;
                    }
                }
                catch (IOException ioe) {
                    throw new RuntimeException(ioe);
                }
            }
            Map.Entry<ImmutableBytesWritable, Aggregator[]> entry = this.cacheIter.next();
            return new SpillManager.CacheEntry<ImmutableBytesWritable>(entry.getKey(), entry.getValue());
        }

        @Override
        public void remove() {
            throw new IllegalAccessError("Remove is not supported for this type of iterator");
        }
    }

    public class QueryCache {
        public boolean isKeyContained(ImmutableBytesPtr key) {
            return SpillableGroupByCache.this.cache.containsKey(key);
        }
    }
}

