package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.io.hfile.bucket.FileIOEngine;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/RecoveredReplicationSource.class */
public class RecoveredReplicationSource extends ReplicationSource {
    private static final Logger LOG = LoggerFactory.getLogger(RecoveredReplicationSource.class);
    private String actualPeerId;

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSource, org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public void init(Configuration configuration, FileSystem fileSystem, ReplicationSourceManager replicationSourceManager, ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server, String str, UUID uuid, ReplicationEndpoint replicationEndpoint, WALFileLengthProvider wALFileLengthProvider, MetricsSource metricsSource) throws IOException {
        super.init(configuration, fileSystem, replicationSourceManager, replicationQueues, replicationPeers, server, str, uuid, replicationEndpoint, wALFileLengthProvider, metricsSource);
        this.actualPeerId = this.replicationQueueInfo.getPeerId();
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSource
    protected void tryStartNewShipper(String str, PriorityBlockingQueue<Path> priorityBlockingQueue) {
        RecoveredReplicationSourceShipper recoveredReplicationSourceShipper = new RecoveredReplicationSourceShipper(this.conf, str, priorityBlockingQueue, this, this.replicationQueues);
        if (this.workerThreads.putIfAbsent(str, recoveredReplicationSourceShipper) != null) {
            LOG.debug("Someone has beat us to start a worker thread for wal group " + str);
            return;
        }
        LOG.debug("Starting up worker for wal group " + str);
        recoveredReplicationSourceShipper.startup(getUncaughtExceptionHandler());
        recoveredReplicationSourceShipper.setWALReader(startNewWALReader(recoveredReplicationSourceShipper.getName(), str, priorityBlockingQueue, recoveredReplicationSourceShipper.getStartPosition()));
        this.workerThreads.put(str, recoveredReplicationSourceShipper);
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSource
    protected ReplicationSourceWALReader startNewWALReader(String str, String str2, PriorityBlockingQueue<Path> priorityBlockingQueue, long j) {
        RecoveredReplicationSourceWALReader recoveredReplicationSourceWALReader = new RecoveredReplicationSourceWALReader(this.fs, this.conf, priorityBlockingQueue, j, this.walEntryFilter, this);
        Threads.setDaemonThreadRunning(recoveredReplicationSourceWALReader, str + ".replicationSource.replicationWALReaderThread." + str2 + FileIOEngine.FILE_DELIMITER + this.peerClusterZnode, getUncaughtExceptionHandler());
        return recoveredReplicationSourceWALReader;
    }

    public void locateRecoveredPaths(PriorityBlockingQueue<Path> priorityBlockingQueue) throws IOException {
        boolean z = false;
        PriorityBlockingQueue priorityBlockingQueue2 = new PriorityBlockingQueue(this.queueSizePerGroup, new ReplicationSource.LogsComparator());
        Iterator<Path> it = priorityBlockingQueue.iterator();
        while (it.hasNext()) {
            Path next = it.next();
            if (!this.fs.exists(next)) {
                z = true;
                if (!(this.server instanceof ReplicationSyncUp.DummyServer)) {
                    List deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
                    LOG.info("NB dead servers : " + deadRegionServers.size());
                    Path wALRootDir = FSUtils.getWALRootDir(this.conf);
                    Iterator it2 = deadRegionServers.iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            LOG.error(String.format("WAL Path %s doesn't exist and couldn't find its new location", next));
                            priorityBlockingQueue2.add(next);
                            break;
                        }
                        Path path = new Path(wALRootDir, AbstractFSWALProvider.getWALDirectoryName(((ServerName) it2.next()).getServerName()));
                        for (Path path2 : new Path[]{new Path(path, next.getName()), new Path(path.suffix(AbstractFSWALProvider.SPLITTING_EXT), next.getName())}) {
                            LOG.info("Possible location " + path2.toUri().toString());
                            if (this.manager.getFs().exists(path2)) {
                                LOG.info("Log " + next + " still exists at " + path2);
                                priorityBlockingQueue2.add(path2);
                                break;
                            }
                        }
                    }
                } else {
                    priorityBlockingQueue2.add(getReplSyncUpPath(next));
                }
            } else {
                priorityBlockingQueue2.add(next);
            }
        }
        if (z) {
            if (priorityBlockingQueue2.size() != priorityBlockingQueue.size()) {
                LOG.error("Recovery queue size is incorrect");
                throw new IOException("Recovery queue size error");
            }
            priorityBlockingQueue.clear();
            Iterator it3 = priorityBlockingQueue2.iterator();
            while (it3.hasNext()) {
                priorityBlockingQueue.add((Path) it3.next());
            }
        }
    }

    private Path getReplSyncUpPath(Path path) throws IOException {
        for (FileStatus fileStatus : this.fs.listStatus(this.manager.getLogDir())) {
            Path path2 = fileStatus.getPath();
            for (FileStatus fileStatus2 : this.fs.listStatus(path2)) {
                path2 = new Path(path2, fileStatus2.getPath().getName());
                if (path2.getName().equals(path.getName())) {
                    LOG.info("Log " + path2.getName() + " found at " + path2);
                    return path2;
                }
            }
        }
        LOG.error("Didn't find path for: " + path.getName());
        return path;
    }

    public void tryFinish() {
        synchronized (this.workerThreads) {
            Threads.sleep(100L);
            boolean z = true;
            Iterator<ReplicationSourceShipper> it = this.workerThreads.values().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                } else if (!it.next().isFinished()) {
                    z = false;
                    break;
                }
            }
            if (z) {
                this.manager.closeRecoveredQueue(this);
                LOG.info("Finished recovering queue " + this.peerClusterZnode + " with the following stats: " + getStats());
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSource, org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public String getPeerId() {
        return this.actualPeerId;
    }

    @Override // org.apache.hadoop.hbase.replication.regionserver.ReplicationSource, org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface
    public ServerName getServerWALsBelongTo() {
        return (ServerName) this.replicationQueueInfo.getDeadRegionServers().get(0);
    }
}
