/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializableImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateStoreZooKeeperImpl
extends StateStoreSerializableImpl {
    private static final Logger LOG = LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);
    private ThreadPoolExecutor executorService;
    private boolean enableConcurrent;
    private String baseZNode;
    private ZKCuratorManager zkManager;
    private List<ACL> zkAcl;

    @Override
    public boolean initDriver() {
        LOG.info("Initializing ZooKeeper connection");
        Configuration conf = this.getConf();
        this.baseZNode = conf.get("dfs.federation.router.store.driver.zk.parent-path", "/hdfs-federation");
        int numThreads = conf.getInt("dfs.federation.router.store.driver.zk.async.max.threads", -1);
        boolean bl = this.enableConcurrent = numThreads > 0;
        if (this.enableConcurrent) {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("StateStore ZK Client-%d").build();
            this.executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
            LOG.info("Init StateStoreZookeeperImpl by async mode with {} threads.", (Object)numThreads);
        } else {
            LOG.info("Init StateStoreZookeeperImpl by sync mode.");
        }
        String zkHostPort = conf.get("dfs.federation.router.store.driver.zk.address");
        try {
            this.zkManager = new ZKCuratorManager(conf);
            this.zkManager.start(zkHostPort);
            this.zkAcl = ZKCuratorManager.getZKAcls((Configuration)conf);
        }
        catch (IOException e) {
            LOG.error("Cannot initialize the ZK connection", (Throwable)e);
            return false;
        }
        return true;
    }

    @Override
    public <T extends BaseRecord> boolean initRecordStorage(String className, Class<T> clazz) {
        try {
            String checkPath = ZKCuratorManager.getNodePath((String)this.baseZNode, (String)className);
            this.zkManager.createRootDirRecursively(checkPath, this.zkAcl);
            return true;
        }
        catch (Exception e) {
            LOG.error("Cannot initialize ZK node for {}: {}", (Object)className, (Object)e.getMessage());
            return false;
        }
    }

    @VisibleForTesting
    public void setEnableConcurrent(boolean enableConcurrent) {
        this.enableConcurrent = enableConcurrent;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
        if (this.zkManager != null) {
            this.zkManager.close();
        }
    }

    @Override
    public boolean isDriverReady() {
        if (this.zkManager == null) {
            return false;
        }
        CuratorFramework curator = this.zkManager.getCurator();
        if (curator == null) {
            return false;
        }
        return curator.getState() == CuratorFrameworkState.STARTED;
    }

    @Override
    public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
        this.verifyDriverReady();
        long start = Time.monotonicNow();
        ArrayList<BaseRecord> ret = new ArrayList<BaseRecord>();
        String znode = this.getZNodeForClass(clazz);
        try {
            ArrayList callables = new ArrayList();
            this.zkManager.getChildren(znode).forEach(c -> callables.add(() -> this.getRecord(clazz, znode, (String)c)));
            if (this.enableConcurrent) {
                List futures = this.executorService.invokeAll(callables);
                for (Future future : futures) {
                    if (future.get() == null) continue;
                    ret.add((BaseRecord)future.get());
                }
            } else {
                for (Callable callable : callables) {
                    BaseRecord record = (BaseRecord)callable.call();
                    if (record == null) continue;
                    ret.add(record);
                }
            }
        }
        catch (Exception e) {
            this.getMetrics().addFailure(Time.monotonicNow() - start);
            String msg = "Cannot get children for \"" + znode + "\": " + e.getMessage();
            LOG.error(msg);
            throw new IOException(msg);
        }
        long end = Time.monotonicNow();
        this.getMetrics().addRead(end - start);
        return new QueryResult(ret, this.getTime());
    }

    private <T extends BaseRecord> T getRecord(Class<T> clazz, String znode, String child) {
        T record = null;
        try {
            String path = ZKCuratorManager.getNodePath((String)znode, (String)child);
            Stat stat = new Stat();
            String data = this.zkManager.getStringData(path, stat);
            boolean corrupted = false;
            if (data == null || data.equals("")) {
                corrupted = true;
            } else {
                try {
                    record = this.createRecord(data, stat, clazz);
                }
                catch (IOException e) {
                    LOG.error("Cannot create record type \"{}\" from \"{}\": {}", new Object[]{clazz.getSimpleName(), data, e.getMessage()});
                    corrupted = true;
                }
            }
            if (corrupted) {
                LOG.error("Cannot get data for {} at {}, cleaning corrupted data", (Object)child, (Object)path);
                this.zkManager.delete(path);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot get data for {}: {}", (Object)child, (Object)e.getMessage());
        }
        return record;
    }

    @Override
    public <T extends BaseRecord> StateStoreOperationResult putAll(List<T> records, boolean update, boolean error) throws IOException {
        this.verifyDriverReady();
        if (records.isEmpty()) {
            return StateStoreOperationResult.getDefaultSuccessResult();
        }
        BaseRecord record0 = (BaseRecord)records.get(0);
        Class<?> recordClass = record0.getClass();
        String znode = this.getZNodeForClass(recordClass);
        long start = Time.monotonicNow();
        AtomicBoolean status = new AtomicBoolean(true);
        ArrayList callables = new ArrayList();
        List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList());
        records.forEach(record -> callables.add(() -> {
            byte[] data;
            String primaryKey = StateStoreZooKeeperImpl.getPrimaryKey(record);
            String recordZNode = ZKCuratorManager.getNodePath((String)znode, (String)primaryKey);
            if (!this.writeNode(recordZNode, data = this.serialize(record), update, error)) {
                failedRecordsKeys.add(StateStoreZooKeeperImpl.getOriginalPrimaryKey(primaryKey));
                status.set(false);
            }
            return null;
        }));
        try {
            if (this.enableConcurrent) {
                this.executorService.invokeAll(callables);
            } else {
                for (Callable callable : callables) {
                    callable.call();
                }
            }
        }
        catch (Exception e) {
            LOG.error("Write record failed : {}", (Object)e.getMessage(), (Object)e);
            throw new IOException(e);
        }
        long end = Time.monotonicNow();
        if (status.get()) {
            this.getMetrics().addWrite(end - start);
        } else {
            this.getMetrics().addFailure(end - start);
        }
        return new StateStoreOperationResult(failedRecordsKeys, status.get());
    }

    @Override
    public <T extends BaseRecord> Map<Query<T>, Integer> remove(Class<T> clazz, List<Query<T>> queries) throws IOException {
        List<T> records;
        this.verifyDriverReady();
        HashMap<Query<T>, Integer> ret = new HashMap<Query<T>, Integer>();
        List trueRemoved = Collections.synchronizedList(new ArrayList());
        if (queries.isEmpty()) {
            return ret;
        }
        long start = Time.monotonicNow();
        try {
            QueryResult<T> result = this.get(clazz);
            records = result.getRecords();
        }
        catch (IOException ex) {
            LOG.error("Cannot get existing records", (Throwable)ex);
            this.getMetrics().addFailure(Time.monotonicNow() - start);
            return ret;
        }
        String znode = this.getZNodeForClass(clazz);
        HashSet<T> recordsToRemove = new HashSet<T>();
        HashMap<Query<T>, List<T>> queryToRecords = new HashMap<Query<T>, List<T>>();
        for (Query<T> query : queries) {
            List<T> filtered = StateStoreUtils.filterMultiple(query, records);
            queryToRecords.put(query, filtered);
            recordsToRemove.addAll(filtered);
        }
        ArrayList callables = new ArrayList();
        recordsToRemove.forEach(existingRecord -> callables.add(() -> {
            LOG.info("Removing \"{}\"", existingRecord);
            try {
                String primaryKey = StateStoreZooKeeperImpl.getPrimaryKey(existingRecord);
                String path = ZKCuratorManager.getNodePath((String)znode, (String)primaryKey);
                if (this.zkManager.delete(path)) {
                    trueRemoved.add(existingRecord);
                } else {
                    LOG.error("Did not remove \"{}\"", existingRecord);
                }
            }
            catch (Exception e) {
                LOG.error("Cannot remove \"{}\"", existingRecord, (Object)e);
                this.getMetrics().addFailure(Time.monotonicNow() - start);
            }
            return null;
        }));
        try {
            if (this.enableConcurrent) {
                this.executorService.invokeAll(callables);
            } else {
                for (Callable callable : callables) {
                    callable.call();
                }
            }
        }
        catch (Exception exception) {
            LOG.error("Record removal failed : {}", (Object)exception.getMessage(), (Object)exception);
        }
        long l = Time.monotonicNow();
        if (!trueRemoved.isEmpty()) {
            this.getMetrics().addRemove(l - start);
        }
        for (Map.Entry entry : queryToRecords.entrySet()) {
            for (BaseRecord record : (List)entry.getValue()) {
                if (!trueRemoved.contains(record)) continue;
                ret.compute((Query)entry.getKey(), (k, v) -> v == null ? 1 : v + 1);
            }
        }
        return ret;
    }

    @Override
    public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws IOException {
        return this.remove(clazz, Collections.singletonList(query)).get(query);
    }

    @Override
    public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws IOException {
        long start = Time.monotonicNow();
        boolean status = true;
        String znode = this.getZNodeForClass(clazz);
        LOG.info("Deleting all children under {}", (Object)znode);
        try {
            List children = this.zkManager.getChildren(znode);
            for (String child : children) {
                String path = ZKCuratorManager.getNodePath((String)znode, (String)child);
                LOG.info("Deleting {}", (Object)path);
                this.zkManager.delete(path);
            }
        }
        catch (Exception e) {
            LOG.error("Cannot remove {}: {}", (Object)znode, (Object)e.getMessage());
            status = false;
        }
        long time = Time.monotonicNow() - start;
        if (status) {
            this.getMetrics().addRemove(time);
        } else {
            this.getMetrics().addFailure(time);
        }
        return status;
    }

    private boolean writeNode(String znode, byte[] bytes, boolean update, boolean error) {
        try {
            boolean created = this.zkManager.create(znode);
            if (!update && !created && error) {
                LOG.info("Cannot write record \"{}\", it already exists", (Object)znode);
                return false;
            }
            this.zkManager.setData(znode, bytes, -1);
            return true;
        }
        catch (Exception e) {
            LOG.error("Cannot write record \"{}\": {}", (Object)znode, (Object)e.getMessage());
            return false;
        }
    }

    private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
        String className = StateStoreUtils.getRecordName(clazz);
        return ZKCuratorManager.getNodePath((String)this.baseZNode, (String)className);
    }

    private <T extends BaseRecord> T createRecord(String data, Stat stat, Class<T> clazz) throws IOException {
        T record = this.newRecord(data, clazz, false);
        ((BaseRecord)record).setDateCreated(stat.getCtime());
        ((BaseRecord)record).setDateModified(stat.getMtime());
        return record;
    }
}

