/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUnavailableException;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreOperationResult;
import org.apache.hadoop.hdfs.server.federation.store.driver.impl.StateStoreSerializableImpl;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StateStoreFileBaseImpl
extends StateStoreSerializableImpl {
    private static final Logger LOG = LoggerFactory.getLogger(StateStoreFileBaseImpl.class);
    private static final String TMP_MARK = ".tmp";
    private static final long OLD_TMP_RECORD_MS = TimeUnit.SECONDS.toMillis(10L);
    private static final Pattern OLD_TMP_RECORD_PATTERN = Pattern.compile(".+\\.(\\d+)\\.tmp");
    private boolean initialized = false;
    private ExecutorService concurrentStoreAccessPool;

    protected abstract <T extends BaseRecord> BufferedReader getReader(String var1);

    @VisibleForTesting
    public abstract <T extends BaseRecord> BufferedWriter getWriter(String var1);

    protected abstract boolean exists(String var1);

    protected abstract boolean mkdir(String var1);

    protected abstract boolean rename(String var1, String var2);

    protected abstract boolean remove(String var1);

    protected abstract List<String> getChildren(String var1);

    protected abstract String getRootDir();

    protected abstract int getConcurrentFilesAccessNumThreads();

    public void setInitialized(boolean ini) {
        this.initialized = ini;
    }

    @Override
    public boolean initDriver() {
        String rootDir = this.getRootDir();
        try {
            if (rootDir == null) {
                LOG.error("Invalid root directory, unable to initialize driver.");
                return false;
            }
            if (!this.exists(rootDir) && !this.mkdir(rootDir)) {
                LOG.error("Cannot create State Store root directory {}", (Object)rootDir);
                return false;
            }
        }
        catch (Exception ex) {
            LOG.error("Cannot initialize filesystem using root directory {}", (Object)rootDir, (Object)ex);
            return false;
        }
        this.setInitialized(true);
        int threads = this.getConcurrentFilesAccessNumThreads();
        if (threads > 1) {
            this.concurrentStoreAccessPool = new ThreadPoolExecutor(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactoryBuilder().setNameFormat("state-store-file-based-concurrent-%d").setDaemon(true).build());
            LOG.info("File based state store will be accessed concurrently with {} max threads", (Object)threads);
        } else {
            LOG.info("File based state store will be accessed serially");
        }
        return true;
    }

    @Override
    public void close() throws Exception {
        if (this.concurrentStoreAccessPool != null) {
            this.concurrentStoreAccessPool.shutdown();
            boolean isTerminated = this.concurrentStoreAccessPool.awaitTermination(5L, TimeUnit.SECONDS);
            LOG.info("Concurrent store access pool is terminated: {}", (Object)isTerminated);
            this.concurrentStoreAccessPool = null;
        }
    }

    @Override
    public <T extends BaseRecord> boolean initRecordStorage(String className, Class<T> recordClass) {
        String dataDirPath = this.getRootDir() + "/" + className;
        try {
            if (!this.exists(dataDirPath)) {
                LOG.info("{} data directory doesn't exist, creating it", (Object)dataDirPath);
                if (!this.mkdir(dataDirPath)) {
                    LOG.error("Cannot create data directory {}", (Object)dataDirPath);
                    return false;
                }
            }
        }
        catch (Exception ex) {
            LOG.error("Cannot create data directory {}", (Object)dataDirPath, (Object)ex);
            return false;
        }
        return true;
    }

    @Override
    public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz) throws IOException {
        this.verifyDriverReady();
        long start = Time.monotonicNow();
        StateStoreMetrics metrics = this.getMetrics();
        List result = Collections.synchronizedList(new ArrayList());
        try {
            String path = this.getPathForClass(clazz);
            List<String> children = this.getChildren(path);
            ArrayList callables = new ArrayList();
            children.forEach(child -> callables.add(() -> this.getRecordsFromFileAndRemoveOldTmpRecords(clazz, result, path, (String)child)));
            if (this.concurrentStoreAccessPool != null) {
                List futures = this.concurrentStoreAccessPool.invokeAll(callables);
                for (Future future : futures) {
                    future.get();
                }
            } else {
                callables.forEach(e -> {
                    try {
                        e.call();
                    }
                    catch (Exception ex) {
                        LOG.error("Failed to retrieve record using file operations.", (Throwable)ex);
                        throw new RuntimeException(ex);
                    }
                });
            }
        }
        catch (Exception e2) {
            if (metrics != null) {
                metrics.addFailure(Time.monotonicNow() - start);
            }
            String msg = "Cannot fetch records for " + clazz.getSimpleName();
            LOG.error(msg, (Throwable)e2);
            throw new IOException(msg, e2);
        }
        if (metrics != null) {
            metrics.addRead(Time.monotonicNow() - start);
        }
        return new QueryResult(result, this.getTime());
    }

    private <T extends BaseRecord> Void getRecordsFromFileAndRemoveOldTmpRecords(Class<T> clazz, List<T> result, String path, String child) throws IOException {
        String pathRecord = path + "/" + child;
        if (child.endsWith(TMP_MARK)) {
            LOG.debug("There is a temporary file {} in {}", (Object)child, (Object)path);
            if (StateStoreFileBaseImpl.isOldTempRecord(child)) {
                LOG.warn("Removing {} as it's an old temporary record", (Object)child);
                this.remove(pathRecord);
            }
        } else {
            T record = this.getRecord(pathRecord, clazz);
            result.add(record);
        }
        return null;
    }

    @VisibleForTesting
    public static boolean isOldTempRecord(String pathRecord) {
        if (!pathRecord.endsWith(TMP_MARK)) {
            return false;
        }
        Matcher m = OLD_TMP_RECORD_PATTERN.matcher(pathRecord);
        if (m.find()) {
            long time = Long.parseLong(m.group(1));
            return Time.now() - time > OLD_TMP_RECORD_MS;
        }
        return false;
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private <T extends BaseRecord> T getRecord(String path, Class<T> clazz) throws IOException {
        BufferedReader reader = this.getReader(path);
        block8: while (true) {
            String line;
            while ((line = reader.readLine()) != null) {
                T t;
                if (line.startsWith("#") || line.length() <= 0) continue;
                try {
                    t = this.newRecord(line, clazz, false);
                }
                catch (Exception ex) {
                    try {
                        LOG.error("Cannot parse line {} in file {}", new Object[]{line, path, ex});
                        continue block8;
                    }
                    catch (Throwable throwable) {
                        throw throwable;
                        throw new IOException("Cannot read " + path + " for record " + clazz.getSimpleName());
                    }
                }
                return t;
            }
        }
        finally {
            if (reader != null) {
                reader.close();
            }
        }
    }

    private <T extends BaseRecord> String getPathForClass(Class<T> clazz) {
        String className = StateStoreUtils.getRecordName(clazz);
        StringBuilder sb = new StringBuilder();
        sb.append(this.getRootDir());
        if (sb.charAt(sb.length() - 1) != '/') {
            sb.append("/");
        }
        sb.append(className);
        return sb.toString();
    }

    @Override
    public boolean isDriverReady() {
        return this.initialized;
    }

    @Override
    public <T extends BaseRecord> StateStoreOperationResult putAll(List<T> records, boolean allowUpdate, boolean errorIfExists) throws StateStoreUnavailableException {
        this.verifyDriverReady();
        if (records.isEmpty()) {
            return StateStoreOperationResult.getDefaultSuccessResult();
        }
        long start = Time.monotonicNow();
        StateStoreMetrics metrics = this.getMetrics();
        HashMap<CallSite, BaseRecord> toWrite = new HashMap<CallSite, BaseRecord>();
        List<String> failedRecordsKeys = Collections.synchronizedList(new ArrayList());
        AtomicBoolean success = new AtomicBoolean(true);
        for (BaseRecord record : records) {
            String primaryKey;
            Class<?> recordClass = record.getClass();
            String path = this.getPathForClass(recordClass);
            String recordPath = path + "/" + (primaryKey = StateStoreFileBaseImpl.getPrimaryKey(record));
            if (this.exists(recordPath)) {
                if (allowUpdate) {
                    record.setDateModified(this.getTime());
                    toWrite.put((CallSite)((Object)recordPath), record);
                    continue;
                }
                if (errorIfExists) {
                    LOG.error("Attempt to insert record {} that already exists", (Object)recordPath);
                    failedRecordsKeys.add(StateStoreFileBaseImpl.getOriginalPrimaryKey(primaryKey));
                    success.set(false);
                    continue;
                }
                LOG.debug("Not updating {}", (Object)record);
                continue;
            }
            toWrite.put((CallSite)((Object)recordPath), record);
        }
        ArrayList callables = new ArrayList();
        toWrite.entrySet().forEach(entry -> callables.add(() -> this.writeRecordToFile(success, (Map.Entry)entry, failedRecordsKeys)));
        if (this.concurrentStoreAccessPool != null) {
            List futures = null;
            try {
                futures = this.concurrentStoreAccessPool.invokeAll(callables);
            }
            catch (InterruptedException e) {
                success.set(false);
                LOG.error("Failed to put record concurrently.", (Throwable)e);
            }
            if (futures != null) {
                for (Future future : futures) {
                    try {
                        future.get();
                    }
                    catch (InterruptedException | ExecutionException e) {
                        success.set(false);
                        LOG.error("Failed to retrieve results from concurrent record put runs.", (Throwable)e);
                    }
                }
            }
        } else {
            callables.forEach(callable -> {
                try {
                    callable.call();
                }
                catch (Exception e) {
                    success.set(false);
                    LOG.error("Failed to put record.", (Throwable)e);
                }
            });
        }
        long end = Time.monotonicNow();
        if (metrics != null) {
            if (success.get()) {
                metrics.addWrite(end - start);
            } else {
                metrics.addFailure(end - start);
            }
        }
        return new StateStoreOperationResult(failedRecordsKeys, success.get());
    }

    private <T extends BaseRecord> Void writeRecordToFile(AtomicBoolean success, Map.Entry<String, T> entry, List<String> failedRecordsList) {
        String recordPath = entry.getKey();
        BaseRecord record = (BaseRecord)entry.getValue();
        String primaryKey = StateStoreFileBaseImpl.getPrimaryKey(record);
        String recordPathTemp = recordPath + "." + Time.now() + TMP_MARK;
        boolean recordWrittenSuccessfully = true;
        try (BufferedWriter writer = this.getWriter(recordPathTemp);){
            String line = this.serializeString(record);
            writer.write(line);
        }
        catch (IOException e) {
            LOG.error("Cannot write {}", (Object)recordPathTemp, (Object)e);
            recordWrittenSuccessfully = false;
            failedRecordsList.add(StateStoreFileBaseImpl.getOriginalPrimaryKey(primaryKey));
            success.set(false);
        }
        if (recordWrittenSuccessfully && !this.rename(recordPathTemp, recordPath)) {
            LOG.error("Failed committing record into {}", (Object)recordPath);
            failedRecordsList.add(StateStoreFileBaseImpl.getOriginalPrimaryKey(primaryKey));
            success.set(false);
        }
        return null;
    }

    @Override
    public <T extends BaseRecord> int remove(Class<T> clazz, Query<T> query) throws StateStoreUnavailableException {
        int removed;
        StateStoreMetrics metrics;
        long start;
        block8: {
            this.verifyDriverReady();
            if (query == null) {
                return 0;
            }
            start = Time.monotonicNow();
            metrics = this.getMetrics();
            removed = 0;
            try {
                QueryResult<T> result = this.get(clazz);
                List<T> existingRecords = result.getRecords();
                List<BaseRecord> recordsToRemove = StateStoreUtils.filterMultiple(query, existingRecords);
                boolean success = true;
                for (BaseRecord recordToRemove : recordsToRemove) {
                    String primaryKey;
                    String path = this.getPathForClass(clazz);
                    String recordToRemovePath = path + "/" + (primaryKey = StateStoreFileBaseImpl.getPrimaryKey(recordToRemove));
                    if (this.remove(recordToRemovePath)) {
                        ++removed;
                        continue;
                    }
                    LOG.error("Cannot remove record {}", (Object)recordToRemovePath);
                    success = false;
                }
                if (!success) {
                    LOG.error("Cannot remove records {} query {}", clazz, query);
                    if (metrics != null) {
                        metrics.addFailure(Time.monotonicNow() - start);
                    }
                }
            }
            catch (IOException e) {
                LOG.error("Cannot remove records {} query {}", new Object[]{clazz, query, e});
                if (metrics == null) break block8;
                metrics.addFailure(Time.monotonicNow() - start);
            }
        }
        if (removed > 0 && metrics != null) {
            metrics.addRemove(Time.monotonicNow() - start);
        }
        return removed;
    }

    @Override
    public <T extends BaseRecord> boolean removeAll(Class<T> clazz) throws StateStoreUnavailableException {
        this.verifyDriverReady();
        long start = Time.monotonicNow();
        StateStoreMetrics metrics = this.getMetrics();
        boolean success = true;
        String path = this.getPathForClass(clazz);
        List<String> children = this.getChildren(path);
        for (String child : children) {
            String pathRecord = path + "/" + child;
            if (this.remove(pathRecord)) continue;
            success = false;
        }
        if (metrics != null) {
            long time = Time.monotonicNow() - start;
            if (success) {
                metrics.addRemove(time);
            } else {
                metrics.addFailure(time);
            }
        }
        return success;
    }
}

