/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.cache;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.compat.hbase.ByteStringer;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.compile.ScanRanges;
import org.apache.phoenix.coprocessor.generated.ServerCacheFactoryProtos;
import org.apache.phoenix.coprocessor.generated.ServerCachingProtos;
import org.apache.phoenix.coprocessorclient.MetaDataProtocol;
import org.apache.phoenix.coprocessorclient.ServerCachingProtocol;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager;
import org.apache.phoenix.join.HashCacheFactory;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.monitoring.TaskExecutionMetricsHolder;
import org.apache.phoenix.protobuf.ProtobufUtil;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.util.Closeables;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.SQLCloseable;
import org.apache.phoenix.util.SQLCloseables;
import org.apache.phoenix.util.ScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCacheClient {
    public static final int UUID_LENGTH = 8;
    public static final byte[] KEY_IN_FIRST_REGION = new byte[]{0};
    private static final Logger LOGGER = LoggerFactory.getLogger(ServerCacheClient.class);
    private static final Random RANDOM = new Random();
    public static final String HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER = "hash.join.server.cache.resend.per.server";
    private final PhoenixConnection connection;
    private final Map<Integer, PTable> cacheUsingTableMap = new ConcurrentHashMap<Integer, PTable>();

    public ServerCacheClient(PhoenixConnection connection) {
        this.connection = connection;
    }

    public PhoenixConnection getConnection() {
        return this.connection;
    }

    public ServerCache createServerCache(byte[] cacheId, QueryPlan delegate) throws SQLException, IOException {
        PTable cacheUsingTable = delegate.getTableRef().getTable();
        ConnectionQueryServices services = delegate.getContext().getConnection().getQueryServices();
        List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes(), delegate.getContext().getStatement().getQueryTimeoutInMillis());
        int nRegions = locations.size();
        HashSet<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
        this.cacheUsingTableMap.put(Bytes.mapKey((byte[])cacheId), cacheUsingTable);
        return new ServerCache(cacheId, servers, new ImmutableBytesWritable(new byte[0]), services, false);
    }

    public ServerCache addServerCache(ScanRanges keyRanges, ImmutableBytesWritable cachePtr, byte[] txState, ServerCachingProtocol.ServerCacheFactory cacheFactory, PTable cacheUsingTable) throws SQLException {
        return this.addServerCache(keyRanges, cachePtr, txState, cacheFactory, cacheUsingTable, false);
    }

    public ServerCache addServerCache(ScanRanges keyRanges, ImmutableBytesWritable cachePtr, byte[] txState, ServerCachingProtocol.ServerCacheFactory cacheFactory, PTable cacheUsingTable, boolean storeCacheOnClient) throws SQLException {
        byte[] cacheId = ServerCacheClient.generateId();
        return this.addServerCache(keyRanges, cacheId, cachePtr, txState, cacheFactory, cacheUsingTable, false, storeCacheOnClient);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ServerCache addServerCache(ScanRanges keyRanges, final byte[] cacheId, final ImmutableBytesWritable cachePtr, final byte[] txState, final ServerCachingProtocol.ServerCacheFactory cacheFactory, final PTable cacheUsingTable, final boolean usePersistentCache, boolean storeCacheOnClient) throws SQLException {
        ServerCache hashCacheSpec;
        block57: {
            ConnectionQueryServices services = this.connection.getQueryServices();
            ArrayList<Table> closeables = new ArrayList<Table>();
            hashCacheSpec = null;
            SQLException firstException = null;
            boolean success = false;
            ThreadPoolExecutor executor = services.getExecutor();
            List<Future> futures = Collections.emptyList();
            try {
                int queryTimeout = this.connection.getQueryServices().getProps().getInt("phoenix.query.timeoutMs", 600000);
                List<HRegionLocation> locations = services.getAllTableRegions(cacheUsingTable.getPhysicalName().getBytes(), queryTimeout);
                int nRegions = locations.size();
                futures = new ArrayList<Future>(nRegions);
                HashSet<HRegionLocation> servers = new HashSet<HRegionLocation>(nRegions);
                for (HRegionLocation entry : locations) {
                    byte[] regionStartKey = entry.getRegion().getStartKey();
                    byte[] regionEndKey = entry.getRegion().getEndKey();
                    if (!servers.contains(entry) && keyRanges.intersectRegion(regionStartKey, regionEndKey, cacheUsingTable.getIndexType() == PTable.IndexType.LOCAL)) {
                        servers.add(entry);
                        if (LOGGER.isDebugEnabled()) {
                            LOGGER.debug(LogUtil.addCustomAnnotations("Adding cache entry to be sent for " + entry, this.connection));
                        }
                        final byte[] key = ServerCacheClient.getKeyInRegion(entry.getRegion().getStartKey());
                        final Table htable = services.getTable(cacheUsingTable.getPhysicalName().getBytes());
                        closeables.add(htable);
                        futures.add(executor.submit(new JobManager.JobCallable<Boolean>(){

                            @Override
                            public Boolean call() throws Exception {
                                return ServerCacheClient.this.addServerCache(htable, key, cacheUsingTable, cacheId, cachePtr, cacheFactory, txState, usePersistentCache);
                            }

                            @Override
                            public Object getJobId() {
                                return ServerCacheClient.this;
                            }

                            @Override
                            public TaskExecutionMetricsHolder getTaskExecutionMetric() {
                                return TaskExecutionMetricsHolder.NO_OP_INSTANCE;
                            }
                        }));
                        continue;
                    }
                    if (!LOGGER.isDebugEnabled()) continue;
                    LOGGER.debug(LogUtil.addCustomAnnotations("NOT adding cache entry to be sent for " + entry + " since one already exists for that entry", this.connection));
                }
                hashCacheSpec = new ServerCache(cacheId, servers, cachePtr, services, storeCacheOnClient);
                int timeoutMs = services.getProps().getInt("phoenix.query.timeoutMs", 600000);
                for (Future future : futures) {
                    future.get(timeoutMs, TimeUnit.MILLISECONDS);
                }
                this.cacheUsingTableMap.put(Bytes.mapKey((byte[])cacheId), cacheUsingTable);
                success = true;
            }
            catch (SQLException e) {
                firstException = e;
                return firstException;
            }
            catch (Exception e) {
                firstException = new SQLException(e);
                return firstException;
            }
            finally {
                try {
                    if (success) {
                    }
                    if (hashCacheSpec != null) {
                        SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
                    }
                    SQLCloseables.closeAllQuietly(Collections.singletonList(hashCacheSpec));
                    for (Future future : futures) {
                        future.cancel(true);
                    }
                }
                finally {
                    try {
                        Closeables.closeAll(closeables);
                    }
                    catch (IOException e) {
                        if (firstException == null) {
                            firstException = new SQLException(e);
                            return firstException;
                        }
                    }
                    finally {
                        if (firstException == null) break block57;
                        throw firstException;
                    }
                }
            }
        }
        if (!LOGGER.isDebugEnabled()) return hashCacheSpec;
        LOGGER.debug(LogUtil.addCustomAnnotations("Cache " + cacheId + " successfully added to servers.", this.connection));
        return hashCacheSpec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeServerCache(ServerCache cache, Set<HRegionLocation> remainingOnServers) throws SQLException {
        Table iterateOverTable = null;
        final byte[] cacheId = cache.getId();
        try {
            ConnectionQueryServices services = this.connection.getQueryServices();
            Throwable lastThrowable = null;
            final PTable cacheUsingTable = this.cacheUsingTableMap.get(Bytes.mapKey((byte[])cacheId));
            byte[] tableName = cacheUsingTable.getPhysicalName().getBytes();
            iterateOverTable = services.getTable(tableName);
            int queryTimeout = this.connection.getQueryServices().getProps().getInt("phoenix.query.timeoutMs", 600000);
            List<HRegionLocation> locations = services.getAllTableRegions(tableName, queryTimeout);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(LogUtil.addCustomAnnotations("Removing Cache " + cacheId + " from servers.", this.connection));
            }
            for (HRegionLocation entry : locations) {
                if (!remainingOnServers.contains(entry)) continue;
                try {
                    byte[] key = ServerCacheClient.getKeyInRegion(entry.getRegion().getStartKey());
                    iterateOverTable.coprocessorService(ServerCachingProtos.ServerCachingService.class, key, key, (Batch.Call)new Batch.Call<ServerCachingProtos.ServerCachingService, ServerCachingProtos.RemoveServerCacheResponse>(){

                        public ServerCachingProtos.RemoveServerCacheResponse call(ServerCachingProtos.ServerCachingService instance) throws IOException {
                            byte[] tenantIdBytes;
                            ServerRpcController controller = new ServerRpcController();
                            CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback();
                            ServerCachingProtos.RemoveServerCacheRequest.Builder builder = ServerCachingProtos.RemoveServerCacheRequest.newBuilder();
                            if (cacheUsingTable.isMultiTenant()) {
                                try {
                                    tenantIdBytes = ServerCacheClient.this.connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(), cacheUsingTable.getBucketNum() != null, ServerCacheClient.this.connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
                                }
                                catch (SQLException e) {
                                    throw new IOException(e);
                                }
                            } else {
                                byte[] byArray = tenantIdBytes = ServerCacheClient.this.connection.getTenantId() == null ? null : ServerCacheClient.this.connection.getTenantId().getBytes();
                            }
                            if (tenantIdBytes != null) {
                                builder.setTenantId(ByteStringer.wrap((byte[])tenantIdBytes));
                            }
                            builder.setCacheId(ByteStringer.wrap((byte[])cacheId));
                            instance.removeServerCache((RpcController)controller, builder.build(), (RpcCallback<ServerCachingProtos.RemoveServerCacheResponse>)rpcCallback);
                            if (controller.getFailedOn() != null) {
                                throw controller.getFailedOn();
                            }
                            return (ServerCachingProtos.RemoveServerCacheResponse)rpcCallback.get();
                        }
                    });
                    remainingOnServers.remove(entry);
                }
                catch (Throwable t) {
                    lastThrowable = t;
                    LOGGER.error(LogUtil.addCustomAnnotations("Error trying to remove hash cache for " + entry, this.connection), t);
                }
            }
            if (!remainingOnServers.isEmpty()) {
                LOGGER.warn(LogUtil.addCustomAnnotations("Unable to remove hash cache for " + remainingOnServers, this.connection), lastThrowable);
            }
            this.cacheUsingTableMap.remove(Bytes.mapKey((byte[])cacheId));
        }
        catch (Throwable throwable) {
            this.cacheUsingTableMap.remove(Bytes.mapKey((byte[])cacheId));
            Closeables.closeQuietly(iterateOverTable);
            throw throwable;
        }
        Closeables.closeQuietly((Closeable)iterateOverTable);
    }

    public static byte[] generateId() {
        long rand = RANDOM.nextLong();
        return Bytes.toBytes((long)rand);
    }

    public static String idToString(byte[] uuid) {
        assert (uuid.length == 8);
        return Long.toString(Bytes.toLong((byte[])uuid));
    }

    private static byte[] getKeyInRegion(byte[] regionStartKey) {
        assert (regionStartKey != null);
        if (Bytes.equals((byte[])regionStartKey, (byte[])HConstants.EMPTY_START_ROW)) {
            return KEY_IN_FIRST_REGION;
        }
        return regionStartKey;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean addServerCache(byte[] startkeyOfRegion, ServerCache cache, HashCacheFactory cacheFactory, byte[] txState, PTable pTable) throws Exception {
        HRegionLocation tableRegionLocation;
        ConnectionQueryServices services;
        byte[] cacheId;
        boolean success;
        Table table;
        block4: {
            boolean bl;
            table = null;
            success = true;
            cacheId = cache.getId();
            try {
                services = this.connection.getQueryServices();
                byte[] tableName = pTable.getPhysicalName().getBytes();
                table = services.getTable(tableName);
                tableRegionLocation = services.getTableRegionLocation(tableName, startkeyOfRegion);
                if (!cache.isExpired(tableRegionLocation)) break block4;
                bl = false;
            }
            catch (Throwable throwable) {
                Closeables.closeQuietly(table);
                throw throwable;
            }
            Closeables.closeQuietly((Closeable)table);
            return bl;
        }
        if (cache.addServer(tableRegionLocation) || services.getProps().getBoolean(HASH_JOIN_SERVER_CACHE_RESEND_PER_SERVER, false)) {
            success = this.addServerCache(table, startkeyOfRegion, pTable, cacheId, cache.getCachePtr(), cacheFactory, txState, false);
        }
        boolean bl = success;
        Closeables.closeQuietly((Closeable)table);
        return bl;
    }

    public boolean addServerCache(Table htable, byte[] key, PTable cacheUsingTable, byte[] cacheId, ImmutableBytesWritable cachePtr, ServerCachingProtocol.ServerCacheFactory cacheFactory, byte[] txState, boolean usePersistentCache) throws Exception {
        Map results;
        byte[] tenantIdBytes;
        byte[] keyInRegion = ServerCacheClient.getKeyInRegion(key);
        ServerCachingProtos.AddServerCacheRequest.Builder builder = ServerCachingProtos.AddServerCacheRequest.newBuilder();
        if (cacheUsingTable.isMultiTenant()) {
            try {
                tenantIdBytes = this.connection.getTenantId() == null ? null : ScanUtil.getTenantIdBytes(cacheUsingTable.getRowKeySchema(), cacheUsingTable.getBucketNum() != null, this.connection.getTenantId(), cacheUsingTable.getViewIndexId() != null);
            }
            catch (SQLException e) {
                throw new IOException(e);
            }
        } else {
            byte[] byArray = tenantIdBytes = this.connection.getTenantId() == null ? null : this.connection.getTenantId().getBytes();
        }
        if (tenantIdBytes != null) {
            builder.setTenantId(ByteStringer.wrap((byte[])tenantIdBytes));
        }
        builder.setCacheId(ByteStringer.wrap((byte[])cacheId));
        builder.setUsePersistentCache(usePersistentCache);
        builder.setCachePtr(ProtobufUtil.toProto(cachePtr));
        builder.setHasProtoBufIndexMaintainer(true);
        ServerCacheFactoryProtos.ServerCacheFactory.Builder svrCacheFactoryBuider = ServerCacheFactoryProtos.ServerCacheFactory.newBuilder();
        svrCacheFactoryBuider.setClassName(cacheFactory.getClass().getName());
        builder.setCacheFactory(svrCacheFactoryBuider.build());
        builder.setTxState(ByteStringer.wrap((byte[])txState));
        builder.setClientVersion(MetaDataProtocol.PHOENIX_VERSION);
        final ServerCachingProtos.AddServerCacheRequest request = builder.build();
        try {
            results = htable.coprocessorService(ServerCachingProtos.ServerCachingService.class, keyInRegion, keyInRegion, (Batch.Call)new Batch.Call<ServerCachingProtos.ServerCachingService, ServerCachingProtos.AddServerCacheResponse>(){

                public ServerCachingProtos.AddServerCacheResponse call(ServerCachingProtos.ServerCachingService instance) throws IOException {
                    ServerRpcController controller = new ServerRpcController();
                    CoprocessorRpcUtils.BlockingRpcCallback rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback();
                    instance.addServerCache((RpcController)controller, request, (RpcCallback<ServerCachingProtos.AddServerCacheResponse>)rpcCallback);
                    if (controller.getFailedOn() != null) {
                        throw controller.getFailedOn();
                    }
                    return (ServerCachingProtos.AddServerCacheResponse)rpcCallback.get();
                }
            });
        }
        catch (Throwable t) {
            throw new Exception(t);
        }
        if (results != null && results.size() == 1) {
            return ((ServerCachingProtos.AddServerCacheResponse)results.values().iterator().next()).getReturn();
        }
        return false;
    }

    public class ServerCache
    implements SQLCloseable {
        private final int size;
        private final byte[] id;
        private final Map<HRegionLocation, Long> servers;
        private ImmutableBytesWritable cachePtr;
        private MemoryManager.MemoryChunk chunk;
        private File outputFile;
        private long maxServerCacheTTL;

        public ServerCache(byte[] id, Set<HRegionLocation> servers, ImmutableBytesWritable cachePtr, ConnectionQueryServices services, boolean storeCacheOnClient) throws IOException {
            this.maxServerCacheTTL = services.getProps().getInt("phoenix.coprocessor.maxServerCacheTimeToLiveMs", 30000);
            this.id = id;
            this.servers = new HashMap<HRegionLocation, Long>();
            long currentTime = EnvironmentEdgeManager.currentTimeMillis();
            for (HRegionLocation loc : servers) {
                this.servers.put(loc, currentTime);
            }
            this.size = cachePtr.getLength();
            if (storeCacheOnClient) {
                try {
                    this.chunk = services.getMemoryManager().allocate(cachePtr.getLength());
                    this.cachePtr = cachePtr;
                }
                catch (InsufficientMemoryException e) {
                    this.outputFile = File.createTempFile("HashJoinCacheSpooler", ".bin", new File(services.getProps().get("phoenix.spool.directory", QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)));
                    try (OutputStream fio = Files.newOutputStream(this.outputFile.toPath(), new OpenOption[0]);){
                        fio.write(cachePtr.get(), cachePtr.getOffset(), cachePtr.getLength());
                    }
                }
            }
        }

        public ImmutableBytesWritable getCachePtr() throws IOException {
            if (this.outputFile != null) {
                try (InputStream fio = Files.newInputStream(this.outputFile.toPath(), new OpenOption[0]);){
                    byte[] b = new byte[this.size];
                    fio.read(b);
                    this.cachePtr = new ImmutableBytesWritable(b);
                }
            }
            return this.cachePtr;
        }

        public int getSize() {
            return this.size;
        }

        public byte[] getId() {
            return this.id;
        }

        public boolean addServer(HRegionLocation loc) {
            if (this.servers.containsKey(loc)) {
                return false;
            }
            this.servers.put(loc, EnvironmentEdgeManager.currentTimeMillis());
            return true;
        }

        public boolean isExpired(HRegionLocation loc) {
            if (this.servers.containsKey(loc)) {
                Long time = this.servers.get(loc);
                return EnvironmentEdgeManager.currentTimeMillis() - time > this.maxServerCacheTTL;
            }
            return false;
        }

        @Override
        public void close() throws SQLException {
            try {
                ServerCacheClient.this.removeServerCache(this, this.servers.keySet());
            }
            finally {
                this.cachePtr = null;
                if (this.chunk != null) {
                    this.chunk.close();
                }
                if (this.outputFile != null) {
                    this.outputFile.delete();
                }
            }
        }
    }
}

