/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication;

import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class ReplicationQueuesZKImpl
extends ReplicationStateZKBase
implements ReplicationQueues {
    private String myQueuesZnode;
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);

    public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
        this(args.getZk(), args.getConf(), args.getAbortable());
    }

    public ReplicationQueuesZKImpl(ZKWatcher zk, Configuration conf, Abortable abortable) {
        super(zk, conf, abortable);
    }

    @Override
    public void init(String serverName) throws ReplicationException {
        this.myQueuesZnode = ZNodePaths.joinZNode((String)this.queuesZNode, (String)serverName);
        try {
            if (ZKUtil.checkExists((ZKWatcher)this.zookeeper, (String)this.myQueuesZnode) < 0) {
                ZKUtil.createWithParents((ZKWatcher)this.zookeeper, (String)this.myQueuesZnode);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not initialize replication queues.", (Throwable)e);
        }
        if (this.conf.getBoolean("hbase.replication.bulkload.enabled", false)) {
            try {
                if (ZKUtil.checkExists((ZKWatcher)this.zookeeper, (String)this.hfileRefsZNode) < 0) {
                    ZKUtil.createWithParents((ZKWatcher)this.zookeeper, (String)this.hfileRefsZNode);
                }
            }
            catch (KeeperException e) {
                throw new ReplicationException("Could not initialize hfile references replication queue.", (Throwable)e);
            }
        }
    }

    @Override
    public void removeQueue(String queueId) {
        try {
            ZKUtil.deleteNodeRecursively((ZKWatcher)this.zookeeper, (String)ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId));
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", (Throwable)e);
        }
    }

    @Override
    public void addLog(String queueId, String filename) throws ReplicationException {
        String znode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId);
        znode = ZNodePaths.joinZNode((String)znode, (String)filename);
        try {
            ZKUtil.createWithParents((ZKWatcher)this.zookeeper, (String)znode);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Could not add log because znode could not be created. queueId=" + queueId + ", filename=" + filename);
        }
    }

    @Override
    public void removeLog(String queueId, String filename) {
        try {
            String znode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId);
            znode = ZNodePaths.joinZNode((String)znode, (String)filename);
            ZKUtil.deleteNode((ZKWatcher)this.zookeeper, (String)znode);
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", (Throwable)e);
        }
    }

    @Override
    public void setLogPosition(String queueId, String filename, long position) {
        try {
            String znode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId);
            znode = ZNodePaths.joinZNode((String)znode, (String)filename);
            ZKUtil.setData((ZKWatcher)this.zookeeper, (String)znode, (byte[])ZKUtil.positionToByteArray((long)position));
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to write replication wal position (filename=" + filename + ", position=" + position + ")", (Throwable)e);
        }
    }

    @Override
    public long getLogPosition(String queueId, String filename) throws ReplicationException {
        String clusterZnode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId);
        String znode = ZNodePaths.joinZNode((String)clusterZnode, (String)filename);
        byte[] bytes = null;
        try {
            bytes = ZKUtil.getData((ZKWatcher)this.zookeeper, (String)znode);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Internal Error: could not get position in log for queueId=" + queueId + ", filename=" + filename, (Throwable)e);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return 0L;
        }
        try {
            return ZKUtil.parseWALPositionFrom((byte[])bytes);
        }
        catch (DeserializationException de) {
            LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename + " znode content, continuing.");
            return 0L;
        }
    }

    @Override
    public boolean isThisOurRegionServer(String regionserver) {
        return ZNodePaths.joinZNode((String)this.queuesZNode, (String)regionserver).equals(this.myQueuesZnode);
    }

    @Override
    public List<String> getUnClaimedQueueIds(String regionserver) {
        if (this.isThisOurRegionServer(regionserver)) {
            return null;
        }
        String rsZnodePath = ZNodePaths.joinZNode((String)this.queuesZNode, (String)regionserver);
        List queues = null;
        try {
            queues = ZKUtil.listChildrenNoWatch((ZKWatcher)this.zookeeper, (String)rsZnodePath);
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, (Throwable)e);
        }
        return queues;
    }

    @Override
    public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
        LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
        return this.moveQueueUsingMulti(regionserver, queueId);
    }

    @Override
    public void removeReplicatorIfQueueIsEmpty(String regionserver) {
        String rsPath = ZNodePaths.joinZNode((String)this.queuesZNode, (String)regionserver);
        try {
            List list = ZKUtil.listChildrenNoWatch((ZKWatcher)this.zookeeper, (String)rsPath);
            if (list != null && list.isEmpty()) {
                ZKUtil.deleteNode((ZKWatcher)this.zookeeper, (String)rsPath);
            }
        }
        catch (KeeperException e) {
            LOG.warn("Got error while removing replicator", (Throwable)e);
        }
    }

    @Override
    public void removeAllQueues() {
        try {
            ZKUtil.deleteNodeRecursively((ZKWatcher)this.zookeeper, (String)this.myQueuesZnode);
        }
        catch (KeeperException e) {
            if (e instanceof KeeperException.SessionExpiredException) {
                return;
            }
            this.abortable.abort("Failed to delete replication queues for region server: " + this.myQueuesZnode, (Throwable)e);
        }
    }

    @Override
    public List<String> getLogsInQueue(String queueId) {
        String znode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)queueId);
        List result = null;
        try {
            result = ZKUtil.listChildrenNoWatch((ZKWatcher)this.zookeeper, (String)znode);
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to get list of wals for queueId=" + queueId, (Throwable)e);
        }
        return result;
    }

    @Override
    public List<String> getAllQueues() {
        List listOfQueues = null;
        try {
            listOfQueues = ZKUtil.listChildrenNoWatch((ZKWatcher)this.zookeeper, (String)this.myQueuesZnode);
        }
        catch (KeeperException e) {
            this.abortable.abort("Failed to get a list of queues for region server: " + this.myQueuesZnode, (Throwable)e);
        }
        return listOfQueues == null ? new ArrayList() : listOfQueues;
    }

    private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
        try {
            String deadRSZnodePath = ZNodePaths.joinZNode((String)this.queuesZNode, (String)znode);
            ArrayList<ZKUtil.ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
            ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
            String newPeerId = peerId + "-" + znode;
            String newPeerZnode = ZNodePaths.joinZNode((String)this.myQueuesZnode, (String)newPeerId);
            String oldClusterZnode = ZNodePaths.joinZNode((String)deadRSZnodePath, (String)peerId);
            List wals = ZKUtil.listChildrenNoWatch((ZKWatcher)this.zookeeper, (String)oldClusterZnode);
            if (!this.peerExists(replicationQueueInfo.getPeerId())) {
                LOG.warn("Peer " + replicationQueueInfo.getPeerId() + " didn't exist, will move its queue to avoid the failure of multi op");
                for (String wal : wals) {
                    String oldWalZnode = ZNodePaths.joinZNode((String)oldClusterZnode, (String)wal);
                    listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)oldWalZnode));
                }
                listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)oldClusterZnode));
                ZKUtil.multiOrSequential((ZKWatcher)this.zookeeper, listOfOps, (boolean)false);
                return null;
            }
            TreeSet<String> logQueue = new TreeSet<String>();
            if (wals == null || wals.isEmpty()) {
                listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)oldClusterZnode));
            } else {
                ZKUtil.ZKUtilOp op = ZKUtil.ZKUtilOp.createAndFailSilent((String)newPeerZnode, (byte[])HConstants.EMPTY_BYTE_ARRAY);
                listOfOps.add(op);
                for (String wal : wals) {
                    String oldWalZnode = ZNodePaths.joinZNode((String)oldClusterZnode, (String)wal);
                    byte[] logOffset = ZKUtil.getData((ZKWatcher)this.zookeeper, (String)oldWalZnode);
                    LOG.debug("Creating " + wal + " with data " + Bytes.toString((byte[])logOffset));
                    String newLogZnode = ZNodePaths.joinZNode((String)newPeerZnode, (String)wal);
                    listOfOps.add(ZKUtil.ZKUtilOp.createAndFailSilent((String)newLogZnode, (byte[])logOffset));
                    listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)oldWalZnode));
                    logQueue.add(wal);
                }
                listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)oldClusterZnode));
                if (LOG.isTraceEnabled()) {
                    LOG.trace(" The multi list size is: " + listOfOps.size());
                }
            }
            ZKUtil.multiOrSequential((ZKWatcher)this.zookeeper, listOfOps, (boolean)false);
            LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
            return new Pair((Object)newPeerId, logQueue);
        }
        catch (KeeperException e) {
            LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", (Throwable)e);
        }
        catch (InterruptedException e) {
            LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", (Throwable)e);
            Thread.currentThread().interrupt();
        }
        return null;
    }

    @Override
    public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException {
        String peerZnode = ZNodePaths.joinZNode((String)this.hfileRefsZNode, (String)peerId);
        boolean debugEnabled = LOG.isDebugEnabled();
        if (debugEnabled) {
            LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
        }
        int size = pairs.size();
        ArrayList<ZKUtil.ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(size);
        for (int i = 0; i < size; ++i) {
            listOfOps.add(ZKUtil.ZKUtilOp.createAndFailSilent((String)ZNodePaths.joinZNode((String)peerZnode, (String)((Path)pairs.get(i).getSecond()).getName()), (byte[])HConstants.EMPTY_BYTE_ARRAY));
        }
        if (debugEnabled) {
            LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode + " is " + listOfOps.size());
        }
        try {
            ZKUtil.multiOrSequential((ZKWatcher)this.zookeeper, listOfOps, (boolean)true);
        }
        catch (KeeperException e) {
            throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), (Throwable)e);
        }
    }

    @Override
    public void removeHFileRefs(String peerId, List<String> files) {
        String peerZnode = ZNodePaths.joinZNode((String)this.hfileRefsZNode, (String)peerId);
        boolean debugEnabled = LOG.isDebugEnabled();
        if (debugEnabled) {
            LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
        }
        int size = files.size();
        ArrayList<ZKUtil.ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>(size);
        for (int i = 0; i < size; ++i) {
            listOfOps.add(ZKUtil.ZKUtilOp.deleteNodeFailSilent((String)ZNodePaths.joinZNode((String)peerZnode, (String)files.get(i))));
        }
        if (debugEnabled) {
            LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode + " is " + listOfOps.size());
        }
        try {
            ZKUtil.multiOrSequential((ZKWatcher)this.zookeeper, listOfOps, (boolean)true);
        }
        catch (KeeperException e) {
            LOG.error("Failed to remove hfile reference znode=" + e.getPath(), (Throwable)e);
        }
    }

    @Override
    public void addPeerToHFileRefs(String peerId) throws ReplicationException {
        String peerZnode = ZNodePaths.joinZNode((String)this.hfileRefsZNode, (String)peerId);
        try {
            if (ZKUtil.checkExists((ZKWatcher)this.zookeeper, (String)peerZnode) == -1) {
                LOG.info("Adding peer " + peerId + " to hfile reference queue.");
                ZKUtil.createWithParents((ZKWatcher)this.zookeeper, (String)peerZnode);
            }
        }
        catch (KeeperException e) {
            throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.", (Throwable)e);
        }
    }

    @Override
    public void removePeerFromHFileRefs(String peerId) {
        String peerZnode = ZNodePaths.joinZNode((String)this.hfileRefsZNode, (String)peerId);
        try {
            if (ZKUtil.checkExists((ZKWatcher)this.zookeeper, (String)peerZnode) == -1) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
                }
                return;
            }
            LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
            ZKUtil.deleteNodeRecursively((ZKWatcher)this.zookeeper, (String)peerZnode);
        }
        catch (KeeperException e) {
            LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.", (Throwable)e);
        }
    }
}

