/*
 * Decompiled with CFR 0.152.
 */
package org.apache.phoenix.coprocessor;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixCoprocessorSourceFactory;
import org.apache.phoenix.coprocessorclient.metrics.MetricsPhoenixMasterSource;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.ParentPartitionNotFound;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhoenixMasterObserver
implements MasterObserver,
MasterCoprocessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(PhoenixMasterObserver.class);
    private MetricsPhoenixMasterSource metricSource;
    private static final String STREAM_STATUS_QUERY = "SELECT STREAM_NAME FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_STATUS_NAME + " WHERE TABLE_NAME = ? AND STREAM_STATUS='" + CDCUtil.CdcStreamStatus.ENABLED.getSerializedValue() + "'";
    private static final String PARTITION_UPSERT_SQL = "UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " VALUES (?,?,?,?,?,?,?,?,?)";
    private static final String PARENT_PARTITION_QUERY_FOR_SPLIT = "SELECT PARTITION_ID, PARENT_PARTITION_ID, PARTITION_START_TIME FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_END_TIME IS NULL ";
    private static final String PARENT_PARTITION_QUERY_FOR_MERGE = "SELECT PARENT_PARTITION_ID FROM " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " WHERE TABLE_NAME = ? AND STREAM_NAME = ? AND PARTITION_ID = ?";
    private static final String PARENT_PARTITION_UPDATE_END_TIME_SQL = "UPSERT INTO " + PhoenixDatabaseMetaData.SYSTEM_CDC_STREAM_NAME + " (TABLE_NAME, STREAM_NAME, PARTITION_ID, PARENT_PARTITION_ID, PARTITION_END_TIME) VALUES (?,?,?,?,?)";
    public static final String PHOENIX_MASTER_MAX_RETRY_COUNT = "phoenix.master.observer.max.retry.count";
    public static final int DEFAULT_PHOENIX_MASTER_MAX_RETRY_COUNT = 20;

    public Optional<MasterObserver> getMasterObserver() {
        return Optional.of(this);
    }

    public void start(CoprocessorEnvironment env) throws IOException {
        this.metricSource = MetricsPhoenixCoprocessorSourceFactory.getInstance().getPhoenixMasterSource();
    }

    public void postCompletedSplitRegionAction(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo regionInfoA, RegionInfo regionInfoB) throws IOException {
        Configuration conf = ((MasterCoprocessorEnvironment)c.getEnvironment()).getConfiguration();
        int maxRetryCount = conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT, 20);
        int tries = 0;
        while (true) {
            try (Connection conn = QueryUtil.getConnectionOnServer((Configuration)conf);){
                PTable phoenixTable = this.getPhoenixTable(conn, regionInfoA.getTable());
                if (phoenixTable == null) {
                    LOGGER.debug("{} is not a Phoenix Table, skipping partition metadata update.", (Object)regionInfoA.getTable());
                    return;
                }
                String tableName = phoenixTable.getName().getString();
                String streamName = this.getStreamName(conn, tableName);
                if (streamName != null) {
                    LOGGER.info("Updating split partition metadata for table={}, stream={} daughters {} {}", new Object[]{tableName, streamName, regionInfoA.getEncodedName(), regionInfoB.getEncodedName()});
                    Pair<List<String>, List<Long>> ancestorInfo = this.getAncestorIdsForSplit(conn, tableName, streamName, regionInfoA, regionInfoB);
                    List ancestorIDs = (List)ancestorInfo.getFirst();
                    this.upsertDaughterPartitions(conn, tableName, streamName, ancestorIDs.subList(0, 1), Arrays.asList(regionInfoA, regionInfoB), (List)ancestorInfo.getSecond());
                    this.updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, regionInfoA.getRegionId());
                    return;
                }
                LOGGER.debug("{} does not have a stream enabled, skipping partition metadata update.", (Object)regionInfoA.getTable());
                return;
            }
            catch (ParentPartitionNotFound e) {
                LOGGER.debug("Parent partition not found, skipping remaining steps.", (Throwable)e);
                return;
            }
            catch (TableNotFoundException e) {
                LOGGER.warn("System Table not found during region split completion. This must happen before metadata upgrade only.", (Throwable)e);
                return;
            }
            catch (Exception e) {
                LOGGER.error("Try: {}. Unable to update CDC Stream Partition metadata during split with daughter regions: {} {}", new Object[]{tries, regionInfoA.getEncodedName(), regionInfoB.getEncodedName(), e});
                this.metricSource.incrementPostSplitPartitionUpdateFailureCount();
                Exception caughtException = e;
                if (tries++ < maxRetryCount) continue;
                throw new IOException("Failed to update CDC Stream Partition metadata after " + maxRetryCount + " retries during split. Daughter regions: " + regionInfoA.getEncodedName() + " " + regionInfoB.getEncodedName(), caughtException);
            }
            break;
        }
    }

    public void postCompletedMergeRegionsAction(ObserverContext<MasterCoprocessorEnvironment> c, RegionInfo[] regionsToMerge, RegionInfo mergedRegion) throws IOException {
        Configuration conf = ((MasterCoprocessorEnvironment)c.getEnvironment()).getConfiguration();
        int maxRetryCount = conf.getInt(PHOENIX_MASTER_MAX_RETRY_COUNT, 20);
        int tries = 0;
        while (true) {
            try (Connection conn = QueryUtil.getConnectionOnServer((Configuration)conf);){
                PTable phoenixTable = this.getPhoenixTable(conn, mergedRegion.getTable());
                if (phoenixTable == null) {
                    LOGGER.debug("{} is not a Phoenix Table, skipping partition metadata update.", (Object)mergedRegion.getTable());
                    return;
                }
                String tableName = phoenixTable.getName().getString();
                String streamName = this.getStreamName(conn, tableName);
                if (streamName != null) {
                    LOGGER.info("Updating merged partition metadata for table={}, stream={} daughter {}", new Object[]{tableName, streamName, mergedRegion.getEncodedName()});
                    this.upsertDaughterPartitions(conn, tableName, streamName, Arrays.stream(regionsToMerge).map(RegionInfo::getEncodedName).collect(Collectors.toList()), Collections.singletonList(mergedRegion), Arrays.stream(regionsToMerge).map(RegionInfo::getRegionId).collect(Collectors.toList()));
                    for (RegionInfo ri : regionsToMerge) {
                        List<String> ancestorIDs = this.getAncestorIdsForMerge(conn, tableName, streamName, ri);
                        this.updateParentPartitionEndTime(conn, tableName, streamName, ancestorIDs, mergedRegion.getRegionId());
                    }
                    return;
                }
                LOGGER.debug("{} does not have a stream enabled, skipping partition metadata update.", (Object)mergedRegion.getTable());
                return;
            }
            catch (ParentPartitionNotFound e) {
                LOGGER.debug("Parent partition not found, skipping remaining steps.", (Throwable)e);
                return;
            }
            catch (TableNotFoundException e) {
                LOGGER.warn("System Table not found during region merge completion. This must happen before metadata upgrade only.", (Throwable)e);
                return;
            }
            catch (Exception e) {
                LOGGER.error("Try: {}. Unable to update CDC Stream Partition metadata during merge with parent regions: {} and daughter region {}", new Object[]{tries, regionsToMerge, mergedRegion.getEncodedName(), e});
                this.metricSource.incrementPostMergePartitionUpdateFailureCount();
                Exception caughtException = e;
                if (tries++ < maxRetryCount) continue;
                throw new IOException("Failed to update CDC Stream Partition metadata after " + maxRetryCount + " retries during merge with parent regions: " + Arrays.toString(regionsToMerge) + " and daughter region: " + mergedRegion.getEncodedName(), caughtException);
            }
            break;
        }
    }

    protected Pair<List<String>, List<Long>> getAncestorIdsForSplit(Connection conn, String tableName, String streamName, RegionInfo regionInfoA, RegionInfo regionInfoB) throws SQLException {
        byte[] parentStartKey = regionInfoA.getStartKey();
        byte[] parentEndKey = regionInfoB.getEndKey();
        StringBuilder qb = new StringBuilder(PARENT_PARTITION_QUERY_FOR_SPLIT);
        if (parentStartKey.length == 0) {
            qb.append(" AND PARTITION_START_KEY IS NULL ");
        } else {
            qb.append(" AND PARTITION_START_KEY = ? ");
        }
        if (parentEndKey.length == 0) {
            qb.append(" AND PARTITION_END_KEY IS NULL ");
        } else {
            qb.append(" AND PARTITION_END_KEY = ? ");
        }
        ArrayList<String> ancestorIDs = new ArrayList<String>();
        ArrayList<Long> parentPartitionStartTimes = new ArrayList<Long>();
        try (PreparedStatement pstmt = conn.prepareStatement(qb.toString());){
            int index = 1;
            pstmt.setString(index++, tableName);
            pstmt.setString(index++, streamName);
            if (parentStartKey.length > 0) {
                pstmt.setBytes(index++, parentStartKey);
            }
            if (parentEndKey.length > 0) {
                pstmt.setBytes(index++, parentEndKey);
            }
            LOGGER.info("Query to get parent partition id: " + pstmt);
            ResultSet rs = pstmt.executeQuery();
            if (rs.next()) {
                ancestorIDs.add(rs.getString(1));
                ancestorIDs.add(rs.getString(2));
                parentPartitionStartTimes.add(rs.getLong(3));
            } else {
                throw new ParentPartitionNotFound(String.format("Could not find parent of the provided daughters: startKeyA=%s endKeyA=%s startKeyB=%s endKeyB=%s", Bytes.toStringBinary((byte[])regionInfoA.getStartKey()), Bytes.toStringBinary((byte[])regionInfoA.getEndKey()), Bytes.toStringBinary((byte[])regionInfoB.getStartKey()), Bytes.toStringBinary((byte[])regionInfoB.getEndKey())));
            }
            while (rs.next()) {
                ancestorIDs.add(rs.getString(2));
            }
        }
        return new Pair(ancestorIDs, parentPartitionStartTimes);
    }

    protected List<String> getAncestorIdsForMerge(Connection conn, String tableName, String streamName, RegionInfo parent) throws SQLException {
        ArrayList<String> ancestorIDs = new ArrayList<String>();
        ancestorIDs.add(parent.getEncodedName());
        try (PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_QUERY_FOR_MERGE);){
            pstmt.setString(1, tableName);
            pstmt.setString(2, streamName);
            pstmt.setString(3, parent.getEncodedName());
            ResultSet rs = pstmt.executeQuery();
            if (rs.next()) {
                ancestorIDs.add(rs.getString(1));
            } else {
                throw new ParentPartitionNotFound(String.format("Could not find parent of the provided merged region: %s", parent.getEncodedName()));
            }
            while (rs.next()) {
                ancestorIDs.add(rs.getString(1));
            }
        }
        return ancestorIDs;
    }

    private void upsertDaughterPartitions(Connection conn, String tableName, String streamName, List<String> parentPartitionIDs, List<RegionInfo> daughters, List<Long> parentPartitionStartTimes) throws SQLException {
        conn.setAutoCommit(false);
        try (PreparedStatement pstmt = conn.prepareStatement(PARTITION_UPSERT_SQL);){
            for (RegionInfo daughter : daughters) {
                for (int i = 0; i < parentPartitionIDs.size(); ++i) {
                    String partitionId = daughter.getEncodedName();
                    long startTime = daughter.getRegionId();
                    byte[] startKey = daughter.getStartKey();
                    byte[] endKey = daughter.getEndKey();
                    pstmt.setString(1, tableName);
                    pstmt.setString(2, streamName);
                    pstmt.setString(3, partitionId);
                    pstmt.setString(4, parentPartitionIDs.get(i));
                    pstmt.setLong(5, startTime);
                    pstmt.setNull(6, -5);
                    pstmt.setBytes(7, startKey.length == 0 ? null : startKey);
                    pstmt.setBytes(8, endKey.length == 0 ? null : endKey);
                    pstmt.setLong(9, parentPartitionStartTimes.get(i));
                    pstmt.executeUpdate();
                }
            }
            conn.commit();
        }
    }

    private void updateParentPartitionEndTime(Connection conn, String tableName, String streamName, List<String> ancestorIDs, long daughterStartTime) throws SQLException {
        conn.setAutoCommit(false);
        try (PreparedStatement pstmt = conn.prepareStatement(PARENT_PARTITION_UPDATE_END_TIME_SQL);){
            for (int i = 1; i < ancestorIDs.size(); ++i) {
                pstmt.setString(1, tableName);
                pstmt.setString(2, streamName);
                pstmt.setString(3, ancestorIDs.get(0));
                pstmt.setString(4, ancestorIDs.get(i));
                pstmt.setLong(5, daughterStartTime);
                pstmt.executeUpdate();
            }
            conn.commit();
        }
    }

    private String getStreamName(Connection conn, String tableName) throws SQLException {
        try (PreparedStatement pstmt = conn.prepareStatement(STREAM_STATUS_QUERY);){
            pstmt.setString(1, tableName);
            ResultSet rs = pstmt.executeQuery();
            if (rs.next()) {
                String string = rs.getString(1);
                return string;
            }
            String string = null;
            return string;
        }
    }

    private PTable getPhoenixTable(Connection conn, TableName tableName) throws SQLException {
        PTable pTable;
        try {
            pTable = PhoenixRuntime.getTable((Connection)conn, (String)tableName.toString());
        }
        catch (TableNotFoundException e) {
            return null;
        }
        return pTable;
    }
}

