/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FileDescriptor;
import org.apache.impala.catalog.FileMetadataLoader;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelFileMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelFileMetadataLoader.class);
    public static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
    private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
    public static final AtomicInteger TOTAL_THREADS = new AtomicInteger(0);
    public static final AtomicInteger TOTAL_TABLES = new AtomicInteger(0);
    private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
    private final String logPrefix_;
    private final FileSystem fs_;
    private final List<Pair<String, Callable<Object>>> loaders_;
    private final List<Consumer<Object>> updaters_;

    public ParallelFileMetadataLoader(FileSystem fs, Collection<HdfsPartition.Builder> partBuilders, ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix) {
        this.fs_ = fs;
        this.logPrefix_ = logPrefix;
        if (writeIdList != null || validTxnList != null) {
            Preconditions.checkState((writeIdList != null && validTxnList != null ? 1 : 0) != 0);
        }
        HashMap partsByPath = Maps.newHashMap();
        for (HdfsPartition.Builder builder : partBuilders) {
            partsByPath.computeIfAbsent(builder.getLocation(), path -> new ArrayList()).add(builder);
        }
        this.loaders_ = new ArrayList<Pair<String, Callable<Object>>>(partsByPath.size());
        this.updaters_ = new ArrayList<Consumer<Object>>(partsByPath.size());
        for (Map.Entry entry : partsByPath.entrySet()) {
            List builders = (List)entry.getValue();
            List<FileDescriptor> oldFds = ((HdfsPartition.Builder)builders.get(0)).getFileDescriptors();
            HdfsFileFormat format = ((HdfsPartition.Builder)builders.get(0)).getFileFormat();
            Preconditions.checkState((!HdfsFileFormat.ICEBERG.equals((Object)format) ? 1 : 0) != 0);
            FileMetadataLoader loader = new FileMetadataLoader((String)entry.getKey(), isRecursive, oldFds, hostIndex, validTxnList, writeIdList, format);
            boolean hasCachedPartition = Iterables.any((Iterable)builders, HdfsPartition.Builder::isMarkedCached);
            loader.setForceRefreshBlockLocations(hasCachedPartition);
            loader.setDebugAction(debugAction);
            this.loaders_.add(new Pair(entry.getKey(), () -> {
                loader.load();
                return loader;
            }));
            this.updaters_.add(result -> ParallelFileMetadataLoader.updatePartBuilders((FileMetadataLoader)result, builders));
        }
    }

    private static void updatePartBuilders(FileMetadataLoader loader, List<HdfsPartition.Builder> builders) {
        for (HdfsPartition.Builder partBuilder : builders) {
            if (!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors())) {
                LOG.trace("Detected files unchanged on partition {}", (Object)partBuilder.getPartitionName());
                continue;
            }
            partBuilder.clearFileDescriptors();
            List<FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
            if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
                partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds());
                partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds());
            } else {
                partBuilder.setFileDescriptors(loader.getLoadedFds());
            }
            partBuilder.setFileMetadataStats(loader.getFileMetadataStats());
        }
    }

    public ParallelFileMetadataLoader(FileSystem fs, String logPrefix, List<Pair<String, Callable<Object>>> loaders, List<Consumer<Object>> updaters) {
        Preconditions.checkNotNull(loaders);
        Preconditions.checkNotNull(updaters);
        Preconditions.checkArgument((loaders.size() == updaters.size() ? 1 : 0) != 0);
        this.fs_ = fs;
        this.logPrefix_ = logPrefix;
        this.loaders_ = loaders;
        this.updaters_ = updaters;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void load() throws TableLoadingException {
        if (this.loaders_.isEmpty()) {
            return;
        }
        int failedLoadTasks = 0;
        int poolSize = ParallelFileMetadataLoader.getPoolSize(this.loaders_.size(), this.fs_);
        ExecutorService pool = ParallelFileMetadataLoader.createPool(poolSize, this.logPrefix_);
        TOTAL_THREADS.addAndGet(poolSize);
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(this.logPrefix_);){
            TOTAL_TABLES.incrementAndGet();
            ArrayList futures = new ArrayList(this.loaders_.size());
            for (Pair<String, Callable<Object>> loader : this.loaders_) {
                futures.add(new Pair(loader.first, pool.submit((Callable)loader.second)));
            }
            Preconditions.checkState((futures.size() == this.updaters_.size() ? 1 : 0) != 0);
            for (int i = 0; i < futures.size(); ++i) {
                Pair future = (Pair)futures.get(i);
                try {
                    Object result = ((Future)future.second).get();
                    this.updaters_.get(i).accept(result);
                    continue;
                }
                catch (InterruptedException | ExecutionException e) {
                    if (++failedLoadTasks > 100) continue;
                    LOG.error("{} encountered an error loading data for path {}", new Object[]{this.logPrefix_, future.first, e});
                }
            }
        }
        finally {
            pool.shutdown();
            TOTAL_THREADS.addAndGet(-poolSize);
            TOTAL_TABLES.addAndGet(-1);
        }
        if (failedLoadTasks > 0) {
            int errorsNotLogged = failedLoadTasks - 100;
            if (errorsNotLogged > 0) {
                LOG.error("{} error loading {} paths. Only the first {} errors were logged", new Object[]{this.logPrefix_, failedLoadTasks, 100});
            }
            throw new TableLoadingException(this.logPrefix_ + ": failed to load " + failedLoadTasks + " paths. Check the catalog server log for more details.");
        }
    }

    public static ExecutorService createPool(int poolSize, String logPrefix) {
        Preconditions.checkState((poolSize > 0 ? 1 : 0) != 0, (String)"Illegal poolSize: {}", (int)poolSize);
        if (poolSize == 1) {
            return MoreExecutors.newDirectExecutorService();
        }
        LOG.info("{} using a thread pool of size {}", (Object)logPrefix, (Object)poolSize);
        return Executors.newFixedThreadPool(poolSize);
    }

    public static int getPoolSize(int numLoaders, FileSystem fs) {
        int poolSize = FileSystemUtil.supportsStorageIds(fs) ? MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
        poolSize = Math.min(numLoaders, poolSize);
        return poolSize;
    }
}

