/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.impala.catalog.FileMetadataLoader;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.IcebergFileMetadataLoader;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParallelFileMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelFileMetadataLoader.class);
    private static final int MAX_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxHdfsPartsParallelLoad();
    private static final int MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD = BackendConfig.INSTANCE.maxNonHdfsPartsParallelLoad();
    public static final AtomicInteger TOTAL_THREADS = new AtomicInteger(0);
    public static final AtomicInteger TOTAL_TABLES = new AtomicInteger(0);
    private static final int MAX_PATH_METADATA_LOADING_ERRORS_TO_LOG = 100;
    private final String logPrefix_;
    private final Map<Path, FileMetadataLoader> loaders_;
    private final Map<Path, List<HdfsPartition.Builder>> partsByPath_;
    private final FileSystem fs_;

    public ParallelFileMetadataLoader(FileSystem fs, Collection<HdfsPartition.Builder> partBuilders, ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix) {
        this(fs, partBuilders, writeIdList, validTxnList, isRecursive, hostIndex, debugAction, logPrefix, new GroupedContentFiles(), false);
    }

    public ParallelFileMetadataLoader(FileSystem fs, Collection<HdfsPartition.Builder> partBuilders, ValidWriteIdList writeIdList, ValidTxnList validTxnList, boolean isRecursive, @Nullable ListMap<TNetworkAddress> hostIndex, String debugAction, String logPrefix, GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) {
        if (writeIdList != null || validTxnList != null) {
            Preconditions.checkState((writeIdList != null && validTxnList != null ? 1 : 0) != 0);
        }
        this.partsByPath_ = Maps.newHashMap();
        for (HdfsPartition.Builder builder : partBuilders) {
            Path partPath = FileSystemUtil.createFullyQualifiedPath(new Path(builder.getLocation()));
            this.partsByPath_.computeIfAbsent(partPath, path -> new ArrayList()).add(builder);
        }
        this.loaders_ = Maps.newHashMap();
        for (Map.Entry entry : this.partsByPath_.entrySet()) {
            List<HdfsPartition.FileDescriptor> oldFds = ((HdfsPartition.Builder)((List)entry.getValue()).get(0)).getFileDescriptors();
            HdfsFileFormat format = ((HdfsPartition.Builder)((List)entry.getValue()).get(0)).getFileFormat();
            FileMetadataLoader loader = format.equals((Object)HdfsFileFormat.ICEBERG) ? new IcebergFileMetadataLoader((Path)entry.getKey(), isRecursive, oldFds, hostIndex, validTxnList, writeIdList, (GroupedContentFiles)Preconditions.checkNotNull((Object)icebergFiles), canDataBeOutsideOfTableLocation) : new FileMetadataLoader((Path)entry.getKey(), isRecursive, oldFds, hostIndex, validTxnList, writeIdList, format);
            boolean hasCachedPartition = Iterables.any((Iterable)((Iterable)entry.getValue()), HdfsPartition.Builder::isMarkedCached);
            loader.setForceRefreshBlockLocations(hasCachedPartition);
            loader.setDebugAction(debugAction);
            this.loaders_.put((Path)entry.getKey(), loader);
        }
        this.logPrefix_ = logPrefix;
        this.fs_ = fs;
    }

    void load() throws TableLoadingException {
        this.loadInternal();
        for (Map.Entry<Path, List<HdfsPartition.Builder>> e : this.partsByPath_.entrySet()) {
            Path p = e.getKey();
            FileMetadataLoader loader = this.loaders_.get(p);
            for (HdfsPartition.Builder partBuilder : e.getValue()) {
                if (!loader.hasFilesChangedCompareTo(partBuilder.getFileDescriptors())) {
                    LOG.trace("Detected files unchanged on partition {}", (Object)partBuilder.getPartitionName());
                    continue;
                }
                partBuilder.clearFileDescriptors();
                List<HdfsPartition.FileDescriptor> deleteDescriptors = loader.getLoadedDeleteDeltaFds();
                if (deleteDescriptors != null && !deleteDescriptors.isEmpty()) {
                    partBuilder.setInsertFileDescriptors(loader.getLoadedInsertDeltaFds());
                    partBuilder.setDeleteFileDescriptors(loader.getLoadedDeleteDeltaFds());
                    continue;
                }
                partBuilder.setFileDescriptors(loader.getLoadedFds());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void loadInternal() throws TableLoadingException {
        if (this.loaders_.isEmpty()) {
            return;
        }
        int failedLoadTasks = 0;
        int poolSize = ParallelFileMetadataLoader.getPoolSize(this.loaders_.size(), this.fs_);
        ExecutorService pool = ParallelFileMetadataLoader.createPool(poolSize, this.logPrefix_);
        TOTAL_THREADS.addAndGet(poolSize);
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(this.logPrefix_);){
            TOTAL_TABLES.incrementAndGet();
            ArrayList<Pair<FileMetadataLoader, Future<Void>>> futures = new ArrayList<Pair<FileMetadataLoader, Future<Void>>>(this.loaders_.size());
            for (FileMetadataLoader loader : this.loaders_.values()) {
                futures.add(new Pair<FileMetadataLoader, Future<Void>>(loader, pool.submit(() -> {
                    loader.load();
                    return null;
                })));
            }
            for (int i = 0; i < futures.size(); ++i) {
                try {
                    ((Future)((Pair)futures.get((int)i)).second).get();
                    continue;
                }
                catch (InterruptedException | ExecutionException e) {
                    if (++failedLoadTasks > 100) continue;
                    LOG.error(this.logPrefix_ + " encountered an error loading data for path " + ((FileMetadataLoader)((Pair)futures.get((int)i)).first).getPartDir(), (Throwable)e);
                }
            }
        }
        finally {
            pool.shutdown();
            TOTAL_THREADS.addAndGet(-poolSize);
            TOTAL_TABLES.addAndGet(-1);
        }
        if (failedLoadTasks > 0) {
            int errorsNotLogged = failedLoadTasks - 100;
            if (errorsNotLogged > 0) {
                LOG.error(this.logPrefix_ + " error loading {} paths. Only the first {} errors were logged", (Object)failedLoadTasks, (Object)100);
            }
            throw new TableLoadingException(this.logPrefix_ + ": failed to load " + failedLoadTasks + " paths. Check the catalog server log for more details.");
        }
    }

    public static ExecutorService createPool(int poolSize, String logPrefix) {
        Preconditions.checkState((poolSize > 0 ? 1 : 0) != 0, (String)"Illegal poolSize: {}", (int)poolSize);
        if (poolSize == 1) {
            return MoreExecutors.newDirectExecutorService();
        }
        LOG.info("{} using a thread pool of size {}", (Object)logPrefix, (Object)poolSize);
        return Executors.newFixedThreadPool(poolSize);
    }

    public static int getPoolSize(int numLoaders, FileSystem fs) {
        int poolSize = FileSystemUtil.supportsStorageIds(fs) ? MAX_HDFS_PARTITIONS_PARALLEL_LOAD : MAX_NON_HDFS_PARTITIONS_PARALLEL_LOAD;
        poolSize = Math.min(numLoaders, poolSize);
        return poolSize;
    }
}

