/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog;

import com.codahale.metrics.Clock;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.Table;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeFsTable;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FileDescriptor;
import org.apache.impala.catalog.FileMetadataLoader;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.IcebergContentFileStore;
import org.apache.impala.catalog.IcebergFileDescriptor;
import org.apache.impala.catalog.ParallelFileMetadataLoader;
import org.apache.impala.catalog.TableLoadingException;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Pair;
import org.apache.impala.common.PrintUtils;
import org.apache.impala.thrift.TIcebergPartition;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.IcebergUtil;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergFileMetadataLoader
extends FileMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergFileMetadataLoader.class);
    private final Table iceTbl_;
    private final Path tablePath_;
    private final GroupedContentFiles icebergFiles_;
    private final List<TIcebergPartition> oldIcebergPartitions_;
    private AtomicInteger nextPartitionId_ = new AtomicInteger(0);
    private ConcurrentHashMap<TIcebergPartition, Integer> loadedIcebergPartitions_;
    private final boolean requiresDataFilesInTableLocation_;

    public IcebergFileMetadataLoader(Table iceTbl, Iterable<IcebergFileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, GroupedContentFiles icebergFiles, List<TIcebergPartition> partitions, boolean requiresDataFilesInTableLocation) {
        super(iceTbl.location(), true, oldFds, hostIndex, null, null, HdfsFileFormat.ICEBERG);
        this.iceTbl_ = iceTbl;
        this.tablePath_ = FileSystemUtil.createFullyQualifiedPath(new Path(iceTbl.location()));
        this.icebergFiles_ = icebergFiles;
        this.oldIcebergPartitions_ = partitions;
        this.requiresDataFilesInTableLocation_ = requiresDataFilesInTableLocation;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void load() throws CatalogException, IOException {
        String msg = String.format("Refreshing Iceberg file metadata from path %s", this.partDir_);
        LOG.trace(msg);
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg);){
            this.loadInternal();
        }
        finally {
            FileMetadataLoader.TOTAL_TASKS.decrementAndGet();
        }
    }

    public List<IcebergFileDescriptor> getLoadedIcebergFds() {
        Preconditions.checkState((this.loadedFds_ != null ? 1 : 0) != 0, (Object)"Must have successfully loaded first");
        return this.loadedFds_.stream().map(fd -> (IcebergFileDescriptor)fd).collect(Collectors.toList());
    }

    public Map<TIcebergPartition, Integer> getIcebergPartitions() {
        Preconditions.checkNotNull(this.loadedIcebergPartitions_);
        return Collections.unmodifiableMap(this.loadedIcebergPartitions_);
    }

    public List<TIcebergPartition> getIcebergPartitionList() {
        return IcebergContentFileStore.convertPartitionMapToList(this.getIcebergPartitions());
    }

    private void loadInternal() throws CatalogException, IOException {
        this.loadedFds_ = new ArrayList();
        this.loadedIcebergPartitions_ = new ConcurrentHashMap();
        this.loadStats_ = new FileMetadataLoader.LoadStats(this.partDir_);
        this.fileMetadataStats_ = new FeFsTable.FileMetadataStats();
        Iterable<ContentFile<?>> newContentFiles = this.loadContentFilesWithOldFds(this.tablePath_);
        ArrayList filesSupportsStorageIds = Lists.newArrayList();
        FileSystem fsForTable = FileSystemUtil.getFileSystemForPath(this.tablePath_);
        FileSystem defaultFs = FileSystemUtil.getDefaultFileSystem();
        AtomicLong numUnknownDiskIds = new AtomicLong();
        for (ContentFile<?> contentFile : newContentFiles) {
            FileSystem fsForPath = fsForTable;
            if (!this.requiresDataFilesInTableLocation_) {
                Path path = new Path(contentFile.path().toString());
                FileSystem fileSystem = fsForPath = path.toUri().getScheme() != null ? FileSystemUtil.getFileSystemForPath(path) : defaultFs;
            }
            if (FileSystemUtil.supportsStorageIds(fsForPath)) {
                filesSupportsStorageIds.add(contentFile);
                continue;
            }
            IcebergFileDescriptor fd = this.createNonLocatedFd(fsForPath, contentFile, this.tablePath_, numUnknownDiskIds);
            this.registerNewlyLoadedFd(fd);
        }
        List<IcebergFileDescriptor> newFds = this.parallelListing(filesSupportsStorageIds, numUnknownDiskIds);
        for (IcebergFileDescriptor fd : newFds) {
            this.registerNewlyLoadedFd(fd);
        }
        this.loadStats_.unknownDiskIds = (int)((long)this.loadStats_.unknownDiskIds + numUnknownDiskIds.get());
        if (LOG.isTraceEnabled()) {
            LOG.trace(this.loadStats_.debugString());
        }
    }

    private void registerNewlyLoadedFd(IcebergFileDescriptor fd) {
        this.loadedFds_.add(fd);
        this.fileMetadataStats_.accumulate(fd);
        ++this.loadStats_.loadedFiles;
    }

    private Iterable<ContentFile<?>> loadContentFilesWithOldFds(Path partPath) throws TableLoadingException {
        if (this.forceRefreshLocations || this.oldFdsByPath_.isEmpty()) {
            return this.icebergFiles_.getAllContentFiles();
        }
        ArrayList newContentFiles = Lists.newArrayList();
        for (ContentFile<?> contentFile : this.icebergFiles_.getAllContentFiles()) {
            IcebergFileDescriptor fd = this.getOldFd(contentFile, partPath);
            if (fd == null) {
                newContentFiles.add(contentFile);
                continue;
            }
            int oldPartId = fd.getFbFileMetadata().icebergMetadata().partId();
            TIcebergPartition partition = this.oldIcebergPartitions_.get(oldPartId);
            Integer newPartId = this.loadedIcebergPartitions_.computeIfAbsent(partition, k -> this.nextPartitionId_.getAndIncrement());
            if (!fd.getFbFileMetadata().icebergMetadata().mutatePartId(newPartId)) {
                throw new TableLoadingException("Error modifying the Iceberg file descriptor.");
            }
            ++this.loadStats_.skippedFiles;
            this.loadedFds_.add(fd);
            this.fileMetadataStats_.accumulate(fd);
        }
        return newContentFiles;
    }

    private IcebergFileDescriptor createNonLocatedFd(FileSystem fs, ContentFile<?> contentFile, Path partPath, AtomicLong numUnknownDiskIds) throws CatalogException, IOException {
        Path fileLoc = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
        FileStatus stat = FeIcebergTable.Utils.createFileStatus(contentFile, fileLoc);
        Pair<String, String> absPathRelPath = this.getAbsPathRelPath(partPath, stat);
        String absPath = (String)absPathRelPath.first;
        String relPath = (String)absPathRelPath.second;
        int partitionId = this.addPartitionInfo(contentFile);
        return IcebergFileDescriptor.cloneWithFileMetadata(this.createFd(fs, stat, relPath, numUnknownDiskIds, absPath), IcebergUtil.createIcebergMetadata(this.iceTbl_, contentFile, partitionId));
    }

    private IcebergFileDescriptor createLocatedFd(FileSystem fs, ContentFile<?> contentFile, FileStatus stat, Path partPath, AtomicLong numUnknownDiskIds) throws CatalogException, IOException {
        Preconditions.checkState((boolean)(stat instanceof LocatedFileStatus));
        Pair<String, String> absPathRelPath = this.getAbsPathRelPath(partPath, stat);
        String absPath = (String)absPathRelPath.first;
        String relPath = (String)absPathRelPath.second;
        int partitionId = this.addPartitionInfo(contentFile);
        return IcebergFileDescriptor.cloneWithFileMetadata(this.createFd(fs, stat, relPath, numUnknownDiskIds, absPath), IcebergUtil.createIcebergMetadata(this.iceTbl_, contentFile, partitionId));
    }

    private int addPartitionInfo(ContentFile<?> contentFile) {
        TIcebergPartition partition = IcebergUtil.createIcebergPartitionInfo(this.iceTbl_, contentFile);
        return this.loadedIcebergPartitions_.computeIfAbsent(partition, k -> this.nextPartitionId_.getAndIncrement());
    }

    Pair<String, String> getAbsPathRelPath(Path partPath, FileStatus stat) throws TableLoadingException {
        String absPath = null;
        String relPath = FileSystemUtil.relativizePathNoThrow(stat.getPath(), partPath);
        if (relPath == null) {
            if (this.requiresDataFilesInTableLocation_) {
                throw new TableLoadingException(String.format("Failed to load Iceberg datafile %s, because it's outside of the table location", stat.getPath().toUri()));
            }
            absPath = stat.getPath().toString();
        }
        return new Pair<Object, String>(absPath, relPath);
    }

    private List<IcebergFileDescriptor> parallelListing(List<ContentFile<?>> contentFiles, AtomicLong numUnknownDiskIds) throws IOException {
        Map<Path, List<ContentFile<?>>> partitionPaths = this.collectPartitionPaths(contentFiles);
        if (partitionPaths.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<IcebergFileDescriptor> ret = new ArrayList<IcebergFileDescriptor>();
        String logPrefix = "Parallel Iceberg file metadata listing";
        int poolSize = IcebergFileMetadataLoader.getPoolSize(partitionPaths.size());
        ExecutorService pool = ParallelFileMetadataLoader.createPool(poolSize, logPrefix);
        ParallelFileMetadataLoader.TOTAL_THREADS.addAndGet(poolSize);
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix);){
            TOTAL_TASKS.addAndGet(partitionPaths.size());
            List tasks = partitionPaths.entrySet().stream().map(entry -> pool.submit(() -> {
                try {
                    List<IcebergFileDescriptor> list = this.createFdsForPartition((Path)entry.getKey(), (List)entry.getValue(), numUnknownDiskIds);
                    return list;
                }
                finally {
                    TOTAL_TASKS.decrementAndGet();
                }
            })).collect(Collectors.toList());
            for (Future task : tasks) {
                ret.addAll((Collection)task.get());
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException(String.format("%s: failed to load paths.", logPrefix), e);
        }
        finally {
            ParallelFileMetadataLoader.TOTAL_THREADS.addAndGet(-poolSize);
            pool.shutdown();
        }
        return ret;
    }

    private Map<Path, List<ContentFile<?>>> collectPartitionPaths(List<ContentFile<?>> contentFiles) {
        Clock clock = Clock.defaultClock();
        long startTime = clock.getTick();
        Map ret = contentFiles.stream().collect(Collectors.groupingBy(cf -> new Path(String.valueOf(cf.path())).getParent(), HashMap::new, Collectors.toList()));
        long duration = clock.getTick() - startTime;
        LOG.info("Collected {} Iceberg content files into {} partitions. Duration: {}", new Object[]{contentFiles.size(), ret.size(), PrintUtils.printTimeNs(duration)});
        return ret;
    }

    private static int getPoolSize(int numLoaders) {
        return Math.min(numLoaders, ParallelFileMetadataLoader.MAX_HDFS_PARTITIONS_PARALLEL_LOAD);
    }

    private List<IcebergFileDescriptor> createFdsForPartition(Path partitionPath, List<ContentFile<?>> contentFiles, AtomicLong numUnknownDiskIds) throws IOException, CatalogException {
        FileSystem fs = FileSystemUtil.getFileSystemForPath(partitionPath);
        RemoteIterator<? extends FileStatus> remoteIterator = FileSystemUtil.listFiles(fs, partitionPath, this.recursive_, this.debugAction_);
        HashMap<Path, FileStatus> pathToFileStatus = new HashMap<Path, FileStatus>();
        while (remoteIterator.hasNext()) {
            FileStatus status = (FileStatus)remoteIterator.next();
            pathToFileStatus.put(status.getPath(), status);
        }
        ArrayList<IcebergFileDescriptor> ret = new ArrayList<IcebergFileDescriptor>();
        for (ContentFile<?> contentFile : contentFiles) {
            Path path = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
            FileStatus stat = (FileStatus)pathToFileStatus.get(path);
            if (stat == null) {
                LOG.warn(String.format("Failed to load Iceberg content file: '%s', Not found on storage", contentFile.path().toString()));
                continue;
            }
            ret.add(this.createLocatedFd(fs, contentFile, stat, this.tablePath_, numUnknownDiskIds));
        }
        return ret;
    }

    IcebergFileDescriptor getOldFd(ContentFile<?> contentFile, Path partPath) throws TableLoadingException {
        FileDescriptor fd;
        Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
        String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, partPath);
        if (lookupPath == null) {
            if (this.requiresDataFilesInTableLocation_) {
                throw new TableLoadingException(String.format("Failed to load Iceberg datafile %s, because it's outside of the table location", contentFilePath));
            }
            lookupPath = contentFilePath.toString();
        }
        if ((fd = (FileDescriptor)this.oldFdsByPath_.get((Object)lookupPath)) == null) {
            return null;
        }
        Preconditions.checkState((boolean)(fd instanceof IcebergFileDescriptor));
        return (IcebergFileDescriptor)fd;
    }
}

