/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.coprocessor;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.hbase.index.ValueGetter;
import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
import org.apache.phoenix.thirdparty.com.google.common.cache.Cache;
import org.apache.phoenix.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.JacksonUtil;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class CDCCompactionUtil {
    private static final Logger LOGGER = LoggerFactory.getLogger(CDCCompactionUtil.class);
    private static volatile Cache<ImmutableBytesPtr, Map<String, Object>> sharedTtlImageCache;

    private CDCCompactionUtil() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    static Cache<ImmutableBytesPtr, Map<String, Object>> getSharedRowImageCache(Configuration config) {
        if (sharedTtlImageCache != null) return sharedTtlImageCache;
        Class<CDCCompactionUtil> clazz = CDCCompactionUtil.class;
        synchronized (CDCCompactionUtil.class) {
            if (sharedTtlImageCache != null) return sharedTtlImageCache;
            int expirySeconds = config.getInt("phoenix.cdc.ttl.shared.cache.expiry.seconds", 1200);
            sharedTtlImageCache = CacheBuilder.newBuilder().expireAfterWrite((long)expirySeconds, TimeUnit.SECONDS).build();
            LOGGER.info("Initialized shared CDC row image cache with expiry of {} seconds", (Object)expirySeconds);
            // ** MonitorExit[var1_1] (shouldn't be in output)
            return sharedTtlImageCache;
        }
    }

    private static String findColumnName(PTable dataTable, Cell cell) {
        try {
            byte[] family = CellUtil.cloneFamily((Cell)cell);
            byte[] qualifier = CellUtil.cloneQualifier((Cell)cell);
            byte[] defaultCf = dataTable.getDefaultFamilyName() != null ? dataTable.getDefaultFamilyName().getBytes() : QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES;
            for (PColumn column : dataTable.getColumns()) {
                if (column.getFamilyName() == null || !Bytes.equals((byte[])family, (byte[])column.getFamilyName().getBytes()) || !Bytes.equals((byte[])qualifier, (byte[])column.getColumnQualifierBytes())) continue;
                if (Bytes.equals((byte[])defaultCf, (byte[])column.getFamilyName().getBytes())) {
                    return column.getName().getString();
                }
                return column.getFamilyName().getString() + "." + column.getName().getString();
            }
        }
        catch (Exception e) {
            LOGGER.error("Error finding column name for cell: {}", (Object)CellUtil.toString((Cell)cell, (boolean)true), (Object)e);
        }
        return null;
    }

    private static Map<String, Object> createTTLDeleteCDCEvent(Put expiredRowPut, PTable dataTable, Map<String, Object> preImage) throws Exception {
        HashMap<String, Object> cdcEvent = new HashMap<String, Object>();
        cdcEvent.put("event_type", "ttl_delete");
        for (List familyCells : expiredRowPut.getFamilyCellMap().values()) {
            for (Cell cell : familyCells) {
                String columnName = CDCCompactionUtil.findColumnName(dataTable, cell);
                if (columnName == null) continue;
                PColumn column = dataTable.getColumnForColumnQualifier(CellUtil.cloneFamily((Cell)cell), CellUtil.cloneQualifier((Cell)cell));
                Object value = column.getDataType().toObject(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
                Object encodedValue = CDCUtil.getColumnEncodedValue((Object)value, (PDataType)column.getDataType());
                preImage.put(columnName, encodedValue);
            }
        }
        cdcEvent.put("pre_image", preImage);
        return cdcEvent;
    }

    private static Put buildCDCIndexPut(long eventTimestamp, byte[] cdcEventBytes, byte[] rowKey, IndexMaintainer cdcIndexMaintainer) {
        Put newCdcIndexPut = new Put(rowKey, eventTimestamp);
        newCdcIndexPut.addColumn(cdcIndexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary(), cdcIndexMaintainer.getEmptyKeyValueQualifier(), eventTimestamp, QueryConstants.UNVERIFIED_BYTES);
        newCdcIndexPut.addColumn(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES, QueryConstants.CDC_IMAGE_CQ_BYTES, eventTimestamp, cdcEventBytes);
        return newCdcIndexPut;
    }

    public static CDCBatchProcessor createBatchProcessor(PTable dataTable, RegionCoprocessorEnvironment env, Region region, byte[] compactionTimeBytes, long compactionTime, String tableName, int cdcTtlMutationMaxRetries, int batchSize) {
        PTable cdcIndex = CDCUtil.getActiveCDCIndex((PTable)dataTable);
        if (cdcIndex == null) {
            LOGGER.warn("No active CDC index found for table {}", (Object)tableName);
            return null;
        }
        return new CDCBatchProcessor(cdcIndex, dataTable, env, region, compactionTimeBytes, compactionTime, tableName, cdcTtlMutationMaxRetries, batchSize);
    }

    static void handleTTLRowExpiration(List<Cell> expiredRow, String expirationType, String tableName, CDCBatchProcessor batchProcessor) {
        if (batchProcessor == null) {
            return;
        }
        try {
            Cell firstCell = expiredRow.get(0);
            byte[] rowKey = CellUtil.cloneRow((Cell)firstCell);
            LOGGER.debug("TTL row expiration detected: table={}, rowKey={}, expirationType={}, cellCount={}, compactionTime={}", new Object[]{tableName, Bytes.toStringBinary((byte[])rowKey), expirationType, expiredRow.size(), batchProcessor.eventTimestamp});
            batchProcessor.addCDCEvent(expiredRow);
        }
        catch (Exception e) {
            LOGGER.error("Error handling TTL row expiration for CDC: table {}", (Object)tableName, (Object)e);
        }
    }

    public static class CDCBatchProcessor {
        private final Map<ImmutableBytesPtr, Put> pendingMutations = new HashMap<ImmutableBytesPtr, Put>();
        private final PTable cdcIndex;
        private final PTable dataTable;
        private final RegionCoprocessorEnvironment env;
        private final Region region;
        private final byte[] compactionTimeBytes;
        private final long eventTimestamp;
        private final String tableName;
        private final int cdcTtlMutationMaxRetries;
        private final int batchSize;
        private final Configuration config;

        public CDCBatchProcessor(PTable cdcIndex, PTable dataTable, RegionCoprocessorEnvironment env, Region region, byte[] compactionTimeBytes, long eventTimestamp, String tableName, int cdcTtlMutationMaxRetries, int batchSize) {
            this.cdcIndex = cdcIndex;
            this.dataTable = dataTable;
            this.env = env;
            this.region = region;
            this.compactionTimeBytes = compactionTimeBytes;
            this.eventTimestamp = eventTimestamp;
            this.tableName = tableName;
            this.cdcTtlMutationMaxRetries = cdcTtlMutationMaxRetries;
            this.batchSize = batchSize;
            this.config = env.getConfiguration();
        }

        public void addCDCEvent(List<Cell> expiredRow) throws Exception {
            byte[] rowKey;
            IndexMaintainer cdcIndexMaintainer;
            Cell firstCell = expiredRow.get(0);
            byte[] dataRowKey = CellUtil.cloneRow((Cell)firstCell);
            Put expiredRowPut = new Put(dataRowKey);
            for (Cell cell : expiredRow) {
                expiredRowPut.add(cell);
            }
            try (PhoenixConnection serverConnection = QueryUtil.getConnectionOnServer((Properties)new Properties(), (Configuration)this.env.getConfiguration()).unwrap(PhoenixConnection.class);){
                cdcIndexMaintainer = this.cdcIndex.getIndexMaintainer(this.dataTable, serverConnection);
                IndexUtil.SimpleValueGetter dataRowVG = new IndexUtil.SimpleValueGetter(expiredRowPut);
                ImmutableBytesPtr rowKeyPtr = new ImmutableBytesPtr(expiredRowPut.getRow());
                Put cdcIndexPut = cdcIndexMaintainer.buildUpdateMutation(GenericKeyValueBuilder.INSTANCE, (ValueGetter)dataRowVG, (ImmutableBytesWritable)rowKeyPtr, this.eventTimestamp, null, null, false, this.region.getRegionInfo().getEncodedNameAsBytes());
                rowKey = (byte[])cdcIndexPut.getRow().clone();
                System.arraycopy(this.compactionTimeBytes, 0, rowKey, 32, PDate.INSTANCE.getByteSize());
            }
            byte[] rowKeyWithoutTimestamp = new byte[rowKey.length - PDate.INSTANCE.getByteSize()];
            System.arraycopy(rowKey, 0, rowKeyWithoutTimestamp, 0, 32);
            System.arraycopy(rowKey, 32 + PDate.INSTANCE.getByteSize(), rowKeyWithoutTimestamp, 32, rowKeyWithoutTimestamp.length - 32);
            ImmutableBytesPtr cacheKeyPtr = new ImmutableBytesPtr(rowKeyWithoutTimestamp);
            Cache<ImmutableBytesPtr, Map<String, Object>> cache = CDCCompactionUtil.getSharedRowImageCache(this.config);
            HashMap existingPreImage = (HashMap)cache.getIfPresent((Object)cacheKeyPtr);
            if (existingPreImage == null) {
                existingPreImage = new HashMap();
                cache.put((Object)cacheKeyPtr, existingPreImage);
            }
            Map cdcEvent = CDCCompactionUtil.createTTLDeleteCDCEvent(expiredRowPut, this.dataTable, existingPreImage);
            byte[] cdcEventBytes = JacksonUtil.getObjectWriter(HashMap.class).writeValueAsBytes((Object)cdcEvent);
            Put cdcIndexPut = CDCCompactionUtil.buildCDCIndexPut(this.eventTimestamp, cdcEventBytes, rowKey, cdcIndexMaintainer);
            this.pendingMutations.put(cacheKeyPtr, cdcIndexPut);
        }

        private void flushMutations(List<Put> mutations) throws Exception {
            if (mutations.isEmpty()) {
                return;
            }
            Exception lastException = null;
            for (int retryCount = 0; retryCount < this.cdcTtlMutationMaxRetries; ++retryCount) {
                try (Table cdcIndexTable = this.env.getConnection().getTable(TableName.valueOf((byte[])this.cdcIndex.getPhysicalName().getBytes()));){
                    cdcIndexTable.put(mutations);
                    lastException = null;
                    LOGGER.debug("Successfully flushed batch of {} CDC mutations for table {}", (Object)mutations.size(), (Object)this.tableName);
                    break;
                }
                catch (Exception e) {
                    lastException = e;
                    long backoffMs = 100L;
                    LOGGER.warn("CDC batch mutation attempt {}/{} failed, retrying in {}ms. Batch size: {}", new Object[]{retryCount + 1, this.cdcTtlMutationMaxRetries, backoffMs, mutations.size(), e});
                    try {
                        Thread.sleep(backoffMs);
                        continue;
                    }
                    catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new IOException("Interrupted during CDC batch mutation retry", ie);
                    }
                }
            }
            if (lastException != null) {
                LOGGER.error("Failed to flush CDC batch after {} attempts for table {}, index {}. {} events are missed.", new Object[]{this.cdcTtlMutationMaxRetries, this.tableName, this.cdcIndex.getPhysicalName().getString(), mutations.size(), lastException});
            }
        }

        public void close() throws Exception {
            if (this.pendingMutations.isEmpty()) {
                LOGGER.trace("No CDC mutations to flush for table {}", (Object)this.tableName);
                return;
            }
            int totalMutations = this.pendingMutations.size();
            LOGGER.info("Flushing {} accumulated CDC mutations for table {} in batches of {}", new Object[]{totalMutations, this.tableName, this.batchSize});
            ArrayList<Put> allMutations = new ArrayList<Put>(this.pendingMutations.values());
            for (int i = 0; i < allMutations.size(); i += this.batchSize) {
                int endIndex = Math.min(i + this.batchSize, allMutations.size());
                List<Put> batch = allMutations.subList(i, endIndex);
                this.flushMutations(batch);
                LOGGER.debug("Flushed CDC batch {}/{} for table {} (mutations {}-{} of {})", new Object[]{i / this.batchSize + 1, (allMutations.size() + this.batchSize - 1) / this.batchSize, this.tableName, i + 1, endIndex, totalMutations});
            }
            this.pendingMutations.clear();
            Cache<ImmutableBytesPtr, Map<String, Object>> cache = CDCCompactionUtil.getSharedRowImageCache(this.config);
            LOGGER.info("CDC batch processor closed for table {}. Processed {} mutations in {} batches. Shared cache size: {}", new Object[]{this.tableName, totalMutations, (totalMutations + this.batchSize - 1) / this.batchSize, cache.size()});
        }
    }
}

