/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.router;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.MountTableRefresherThread;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterServiceState;
import org.apache.hadoop.hdfs.server.federation.store.MountTableStore;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalNotification;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MountTableRefresherService
extends AbstractService {
    private static final String ROUTER_CONNECT_ERROR_MSG = "Router {} connection failed. Mount table cache will not refresh.";
    private static final Logger LOG = LoggerFactory.getLogger(MountTableRefresherService.class);
    private final Router router;
    private MountTableStore mountTableStore;
    private String localAdminAddress;
    private long cacheUpdateTimeout;
    private LoadingCache<String, RouterClient> routerClientsCache;
    private ScheduledExecutorService clientCacheCleanerScheduler;

    public MountTableRefresherService(Router router) {
        super(MountTableRefresherService.class.getSimpleName());
        this.router = router;
    }

    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
        this.mountTableStore = this.getMountTableStore();
        this.mountTableStore.setRefreshService(this);
        this.localAdminAddress = conf.getBoolean("dfs.federation.router.heartbeat.with.ip.enable", false) ? StateStoreUtils.getIpPortString(this.router.getAdminServerAddress()) : StateStoreUtils.getHostPortString(this.router.getAdminServerAddress());
        LOG.info("Initialized MountTableRefresherService with addr: {}", (Object)this.localAdminAddress);
        this.cacheUpdateTimeout = conf.getTimeDuration("dfs.federation.router.mount-table.cache.update.timeout", RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
        long routerClientMaxLiveTime = conf.getTimeDuration("dfs.federation.router.mount-table.cache.update.client.max.time", RBFConfigKeys.MOUNT_TABLE_CACHE_UPDATE_CLIENT_MAX_TIME_DEFAULT, TimeUnit.MILLISECONDS);
        this.routerClientsCache = CacheBuilder.newBuilder().expireAfterWrite(routerClientMaxLiveTime, TimeUnit.MILLISECONDS).removalListener(this.getClientRemover()).build(this.getClientCreator());
        this.initClientCacheCleaner(routerClientMaxLiveTime);
    }

    private void initClientCacheCleaner(long routerClientMaxLiveTime) {
        this.clientCacheCleanerScheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("MountTableRefresh_ClientsCacheCleaner").setDaemon(true).build());
        this.clientCacheCleanerScheduler.scheduleWithFixedDelay(() -> this.routerClientsCache.cleanUp(), routerClientMaxLiveTime, routerClientMaxLiveTime, TimeUnit.MILLISECONDS);
    }

    private RemovalListener<String, RouterClient> getClientRemover() {
        return new RemovalListener<String, RouterClient>(){

            public void onRemoval(RemovalNotification<String, RouterClient> notification) {
                MountTableRefresherService.this.closeRouterClient((RouterClient)notification.getValue());
            }
        };
    }

    @VisibleForTesting
    protected void closeRouterClient(RouterClient client) {
        try {
            client.close();
        }
        catch (IOException e) {
            LOG.error("Error while closing RouterClient", (Throwable)e);
        }
    }

    private CacheLoader<String, RouterClient> getClientCreator() {
        return new CacheLoader<String, RouterClient>(){

            public RouterClient load(String adminAddress) throws IOException {
                InetSocketAddress routerSocket = NetUtils.createSocketAddr((String)adminAddress);
                Configuration config = MountTableRefresherService.this.getConfig();
                return MountTableRefresherService.this.createRouterClient(routerSocket, config);
            }
        };
    }

    @VisibleForTesting
    protected RouterClient createRouterClient(InetSocketAddress routerSocket, Configuration config) throws IOException {
        return (RouterClient)SecurityUtil.doAsLoginUser(() -> {
            if (UserGroupInformation.isSecurityEnabled()) {
                UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
            }
            return new RouterClient(routerSocket, config);
        });
    }

    protected void serviceStart() throws Exception {
        super.serviceStart();
    }

    protected void serviceStop() throws Exception {
        super.serviceStop();
        this.clientCacheCleanerScheduler.shutdown();
        this.routerClientsCache.invalidateAll();
    }

    private MountTableStore getMountTableStore() throws IOException {
        MountTableStore mountTblStore = this.router.getStateStore().getRegisteredRecordStore(MountTableStore.class);
        if (mountTblStore == null) {
            throw new IOException("Mount table state store is not available.");
        }
        return mountTblStore;
    }

    public void refresh() throws StateStoreUnavailableException {
        RouterStore routerStore = this.router.getRouterStateManager();
        try {
            routerStore.loadCache(true);
        }
        catch (IOException e) {
            LOG.warn("RouterStore load cache failed,", (Throwable)e);
        }
        List cachedRecords = routerStore.getCachedRecords();
        ArrayList<MountTableRefresherThread> refreshThreads = new ArrayList<MountTableRefresherThread>();
        for (RouterState routerState : cachedRecords) {
            String adminAddress = routerState.getAdminAddress();
            if (adminAddress == null || adminAddress.isEmpty()) continue;
            if (routerState.getStatus() != RouterServiceState.RUNNING) {
                LOG.info("Router {} is not running. Mount table cache will not refresh.", (Object)routerState.getAddress());
                this.removeFromCache(adminAddress);
                continue;
            }
            if (this.isLocalAdmin(adminAddress)) {
                refreshThreads.add(this.getLocalRefresher(adminAddress));
                LOG.debug("Added local refresher for {}", (Object)adminAddress);
                continue;
            }
            try {
                RouterClient client = (RouterClient)this.routerClientsCache.get((Object)adminAddress);
                refreshThreads.add(new MountTableRefresherThread(client.getMountTableManager(), adminAddress));
                LOG.debug("Added remote refresher for {}", (Object)adminAddress);
            }
            catch (ExecutionException execExcep) {
                LOG.warn(ROUTER_CONNECT_ERROR_MSG, (Object)adminAddress, (Object)execExcep);
            }
        }
        if (!refreshThreads.isEmpty()) {
            this.invokeRefresh(refreshThreads);
        }
    }

    @VisibleForTesting
    protected MountTableRefresherThread getLocalRefresher(String adminAddress) {
        return new MountTableRefresherThread(this.router.getAdminServer(), adminAddress);
    }

    private void removeFromCache(String adminAddress) {
        this.routerClientsCache.invalidate((Object)adminAddress);
    }

    private void invokeRefresh(List<MountTableRefresherThread> refreshThreads) {
        CountDownLatch countDownLatch = new CountDownLatch(refreshThreads.size());
        for (MountTableRefresherThread refThread : refreshThreads) {
            refThread.setCountDownLatch(countDownLatch);
            refThread.start();
        }
        try {
            boolean allReqCompleted = countDownLatch.await(this.cacheUpdateTimeout, TimeUnit.MILLISECONDS);
            if (!allReqCompleted) {
                LOG.warn("Not all router admins updated their cache");
            }
        }
        catch (InterruptedException e) {
            LOG.error("Mount table cache refresher was interrupted.", (Throwable)e);
        }
        this.logResult(refreshThreads);
    }

    private boolean isLocalAdmin(String adminAddress) {
        return adminAddress.contentEquals(this.localAdminAddress);
    }

    private void logResult(List<MountTableRefresherThread> refreshThreads) {
        int successCount = 0;
        int failureCount = 0;
        for (MountTableRefresherThread mountTableRefreshThread : refreshThreads) {
            if (mountTableRefreshThread.isSuccess()) {
                ++successCount;
                continue;
            }
            LOG.debug("Failed to refresh {}", (Object)mountTableRefreshThread.getAdminAddress());
            ++failureCount;
            this.removeFromCache(mountTableRefreshThread.getAdminAddress());
        }
        LOG.info("Mount table entries cache refresh successCount={},failureCount={}", (Object)successCount, (Object)failureCount);
    }
}

