package org.apache.hadoop.hbase.master.zksyncer;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.phoenix.shaded.org.apache.yetus.audience.InterfaceAudience;
import org.apache.phoenix.shaded.org.apache.zookeeper.CreateMode;
import org.apache.phoenix.shaded.org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer.class */
public abstract class ClientZKSyncer extends ZKListener {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ClientZKSyncer.class);
    private final Server server;
    private final ZKWatcher clientZkWatcher;
    private final ConcurrentMap<String, ZKData> queues;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer$ClientZkUpdater.class */
    public final class ClientZkUpdater extends Thread {
        private final String znode;
        private final ZKData zkData;

        public ClientZkUpdater(String str, ZKData zKData) {
            this.znode = str;
            this.zkData = zKData;
            setName("ClientZKUpdater-" + str);
        }

        /* JADX WARN: Code restructure failed: missing block: B:11:0x0042, code lost:
        
            r4.this$0.deleteDataForClientZkUntilSuccess(r4.znode);
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                r4 = this;
                org.slf4j.Logger r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$100()
                java.lang.String r1 = "Client zk updater for znode {} started"
                r2 = r4
                java.lang.String r2 = r2.znode
                r0.debug(r1, r2)
            Le:
                r0 = r4
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.this
                org.apache.hadoop.hbase.Server r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$200(r0)
                boolean r0 = r0.isStopped()
                if (r0 != 0) goto L67
                r0 = r4
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer$ZKData r0 = r0.zkData     // Catch: java.lang.InterruptedException -> L53
                byte[] r0 = r0.get()     // Catch: java.lang.InterruptedException -> L53
                r5 = r0
                r0 = r5
                if (r0 == 0) goto L38
                r0 = r4
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.this     // Catch: java.lang.InterruptedException -> L53
                r1 = r4
                java.lang.String r1 = r1.znode     // Catch: java.lang.InterruptedException -> L53
                r2 = r5
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$300(r0, r1, r2)     // Catch: java.lang.InterruptedException -> L53
                goto L50
            L38:
                r0 = r4
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer$ZKData r0 = r0.zkData     // Catch: java.lang.InterruptedException -> L53
                boolean r0 = r0.isDeleted()     // Catch: java.lang.InterruptedException -> L53
                if (r0 == 0) goto L50
                r0 = r4
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.this     // Catch: java.lang.InterruptedException -> L53
                r1 = r4
                java.lang.String r1 = r1.znode     // Catch: java.lang.InterruptedException -> L53
                org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$400(r0, r1)     // Catch: java.lang.InterruptedException -> L53
                goto L67
            L50:
                goto Le
            L53:
                r5 = move-exception
                org.slf4j.Logger r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$100()
                java.lang.String r1 = "Interrupted while checking whether need to update meta location to client zk"
                r0.debug(r1)
                java.lang.Thread r0 = java.lang.Thread.currentThread()
                r0.interrupt()
                goto L67
            L67:
                org.slf4j.Logger r0 = org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.access$100()
                java.lang.String r1 = "Client zk updater for znode {} stopped"
                r2 = r4
                java.lang.String r2 = r2.znode
                r0.debug(r1, r2)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hbase.master.zksyncer.ClientZKSyncer.ClientZkUpdater.run():void");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/master/zksyncer/ClientZKSyncer$ZKData.class */
    public static final class ZKData {
        byte[] data;
        boolean delete;

        private ZKData() {
            this.delete = false;
        }

        synchronized void set(byte[] bArr) {
            this.data = bArr;
            notifyAll();
        }

        synchronized byte[] get() throws InterruptedException {
            while (!this.delete && this.data == null) {
                wait();
            }
            byte[] bArr = this.data;
            this.data = null;
            return bArr;
        }

        synchronized void delete() {
            this.delete = true;
            notifyAll();
        }

        synchronized boolean isDeleted() {
            return this.delete;
        }
    }

    public ClientZKSyncer(ZKWatcher zKWatcher, ZKWatcher zKWatcher2, Server server) {
        super(zKWatcher);
        this.server = server;
        this.clientZkWatcher = zKWatcher2;
        this.queues = new ConcurrentHashMap();
    }

    private void startNewSyncThread(String str) {
        ZKData zKData = new ZKData();
        this.queues.put(str, zKData);
        ClientZkUpdater clientZkUpdater = new ClientZkUpdater(str, zKData);
        clientZkUpdater.setDaemon(true);
        clientZkUpdater.start();
        watchAndCheckExists(str);
    }

    public void start() throws KeeperException {
        LOG.debug("Starting " + getClass().getSimpleName());
        this.watcher.registerListener(this);
        ZKUtil.createWithParents(this.clientZkWatcher, this.watcher.getZNodePaths().baseZNode);
        Set<String> pathsToWatch = getPathsToWatch();
        LOG.debug("ZNodes to watch: {}", pathsToWatch);
        Iterator<String> it = pathsToWatch.iterator();
        while (it.hasNext()) {
            startNewSyncThread(it.next());
        }
    }

    private void watchAndCheckExists(String str) {
        try {
            if (ZKUtil.watchAndCheckExists(this.watcher, str)) {
                byte[] dataAndWatch = ZKUtil.getDataAndWatch(this.watcher, str);
                if (dataAndWatch != null) {
                    upsertQueue(str, dataAndWatch);
                } else {
                    LOG.debug("Found no data from " + str);
                    watchAndCheckExists(str);
                }
            } else {
                ZKUtil.deleteNodeFailSilent(this.clientZkWatcher, str);
            }
        } catch (KeeperException e) {
            this.server.abort("Unexpected exception during initialization, aborting", e);
        }
    }

    private void upsertQueue(String str, byte[] bArr) {
        ZKData zKData = this.queues.get(str);
        if (zKData != null) {
            zKData.set(bArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setDataForClientZkUntilSuccess(String str, byte[] bArr) throws InterruptedException {
        boolean z = false;
        while (!this.server.isStopped()) {
            try {
                LOG.debug("Set data for remote " + str + ", client zk wather: " + this.clientZkWatcher);
                if (z) {
                    ZKUtil.createNodeIfNotExistsNoWatch(this.clientZkWatcher, str, bArr, CreateMode.PERSISTENT);
                } else {
                    ZKUtil.setData(this.clientZkWatcher, str, bArr);
                }
                return;
            } catch (KeeperException e) {
                LOG.debug("Failed to set data for {} to client ZK, will retry later", str, e);
                if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
                    reconnectAfterExpiration();
                }
                if (e.code() == KeeperException.Code.NONODE) {
                    z = true;
                }
                if (e.code() == KeeperException.Code.NODEEXISTS) {
                    z = false;
                }
                Threads.sleep(200L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteDataForClientZkUntilSuccess(String str) throws InterruptedException {
        while (!this.server.isStopped()) {
            LOG.debug("Delete remote " + str + ", client zk wather: " + this.clientZkWatcher);
            try {
                ZKUtil.deleteNode(this.clientZkWatcher, str);
                return;
            } catch (KeeperException e) {
                if (e.code() == KeeperException.Code.NONODE) {
                    LOG.debug("Node is already deleted, give up", (Throwable) e);
                    return;
                } else {
                    LOG.debug("Failed to delete node from client ZK, will retry later", (Throwable) e);
                    if (e.code() == KeeperException.Code.SESSIONEXPIRED) {
                        reconnectAfterExpiration();
                    }
                }
            }
        }
    }

    private final void reconnectAfterExpiration() throws InterruptedException {
        LOG.warn("ZK session expired or lost. Retry a new connection...");
        try {
            this.clientZkWatcher.reconnectAfterExpiration();
        } catch (IOException | KeeperException e) {
            LOG.warn("Failed to reconnect to client zk after session expiration, will retry later", e);
        }
    }

    private void getDataAndWatch(String str) {
        try {
            upsertQueue(str, ZKUtil.getDataAndWatch(this.watcher, str));
        } catch (KeeperException e) {
            LOG.warn("Unexpected exception handling nodeCreated event", (Throwable) e);
        }
    }

    private void removeQueue(String str) {
        ZKData remove = this.queues.remove(str);
        if (remove != null) {
            remove.delete();
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public void nodeCreated(String str) {
        if (validate(str)) {
            getDataAndWatch(str);
        } else {
            removeQueue(str);
        }
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public void nodeDataChanged(String str) {
        nodeCreated(str);
    }

    @Override // org.apache.hadoop.hbase.zookeeper.ZKListener
    public synchronized void nodeDeleted(String str) {
        if (!validate(str)) {
            removeQueue(str);
            return;
        }
        try {
            if (ZKUtil.watchAndCheckExists(this.watcher, str)) {
                getDataAndWatch(str);
            }
        } catch (KeeperException e) {
            LOG.warn("Unexpected exception handling nodeDeleted event for path: " + str, (Throwable) e);
        }
    }

    protected abstract boolean validate(String str);

    protected abstract Set<String> getPathsToWatch();

    /* JADX INFO: Access modifiers changed from: protected */
    public final void refreshWatchingList() {
        Set<String> pathsToWatch = getPathsToWatch();
        LOG.debug("New ZNodes to watch: {}", pathsToWatch);
        Iterator<Map.Entry<String, ZKData>> it = this.queues.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ZKData> next = it.next();
            if (!pathsToWatch.contains(next.getKey())) {
                it.remove();
                next.getValue().delete();
            }
        }
        for (String str : pathsToWatch) {
            if (!this.queues.containsKey(str)) {
                startNewSyncThread(str);
            }
        }
    }
}
