/*
 * Decompiled with CFR 0.152.
 */
package org.apache.impala.catalog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.iceberg.ContentFile;
import org.apache.impala.catalog.CatalogException;
import org.apache.impala.catalog.FeIcebergTable;
import org.apache.impala.catalog.FileMetadataLoader;
import org.apache.impala.catalog.HdfsFileFormat;
import org.apache.impala.catalog.HdfsPartition;
import org.apache.impala.catalog.ParallelFileMetadataLoader;
import org.apache.impala.catalog.iceberg.GroupedContentFiles;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.common.Reference;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.ThreadNameAnnotator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergFileMetadataLoader
extends FileMetadataLoader {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergFileMetadataLoader.class);
    private static final Configuration CONF = new Configuration();
    private final int NEW_FILES_THRESHOLD_DEFAULT = 100;
    private final int newFilesThreshold_;
    private final GroupedContentFiles icebergFiles_;
    private final boolean canDataBeOutsideOfTableLocation_;

    public IcebergFileMetadataLoader(Path partDir, boolean recursive, List<HdfsPartition.FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, ValidTxnList validTxnList, ValidWriteIdList writeIds, GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation) {
        this(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, icebergFiles, canDataBeOutsideOfTableLocation, BackendConfig.INSTANCE.icebergReloadNewFilesThreshold());
    }

    public IcebergFileMetadataLoader(Path partDir, boolean recursive, List<HdfsPartition.FileDescriptor> oldFds, ListMap<TNetworkAddress> hostIndex, ValidTxnList validTxnList, ValidWriteIdList writeIds, GroupedContentFiles icebergFiles, boolean canDataBeOutsideOfTableLocation, int newFilesThresholdParam) {
        super(partDir, recursive, oldFds, hostIndex, validTxnList, writeIds, HdfsFileFormat.ICEBERG);
        this.icebergFiles_ = icebergFiles;
        this.canDataBeOutsideOfTableLocation_ = canDataBeOutsideOfTableLocation;
        if (newFilesThresholdParam >= 0) {
            this.newFilesThreshold_ = newFilesThresholdParam;
        } else {
            this.newFilesThreshold_ = 100;
            LOG.warn("Ignoring invalid new files threshold: {} using value: {}", (Object)newFilesThresholdParam, (Object)this.newFilesThreshold_);
        }
    }

    @Override
    public void load() throws CatalogException, IOException {
        if (!this.shouldReuseOldFds()) {
            super.load();
        } else {
            try {
                this.reloadWithOldFds();
            }
            finally {
                FileMetadataLoader.TOTAL_TASKS.decrementAndGet();
            }
        }
    }

    private void reloadWithOldFds() throws IOException {
        this.loadStats_ = new FileMetadataLoader.LoadStats(this.partDir_);
        FileSystem fs = this.partDir_.getFileSystem(CONF);
        String msg = String.format("Refreshing Iceberg file metadata from path %s while reusing old file descriptors", this.partDir_);
        LOG.trace(msg);
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(msg);){
            this.loadedFds_ = new ArrayList();
            Reference<Long> numUnknownDiskIds = new Reference<Long>(0L);
            for (ContentFile<?> contentFile : this.icebergFiles_.getAllContentFiles()) {
                HdfsPartition.FileDescriptor fd = this.getOldFd(contentFile);
                if (fd == null) {
                    fd = this.getFileDescriptor(fs, contentFile, numUnknownDiskIds);
                } else {
                    ++this.loadStats_.skippedFiles;
                }
                this.loadedFds_.add(Preconditions.checkNotNull((Object)fd));
            }
            Preconditions.checkState((this.loadStats_.loadedFiles <= this.newFilesThreshold_ ? 1 : 0) != 0);
            this.loadStats_.unknownDiskIds = (int)((long)this.loadStats_.unknownDiskIds + numUnknownDiskIds.getRef());
            if (LOG.isTraceEnabled()) {
                LOG.trace(this.loadStats_.debugString());
            }
        }
    }

    private HdfsPartition.FileDescriptor getFileDescriptor(FileSystem fs, ContentFile<?> contentFile, Reference<Long> numUnknownDiskIds) throws IOException {
        Path fileLoc = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
        FileStatus stat = FileSystemUtil.supportsStorageIds(fs) ? FeIcebergTable.Utils.createLocatedFileStatus(fileLoc, fs) : FeIcebergTable.Utils.createFileStatus(contentFile, fileLoc);
        return this.getFileDescriptor(fs, FileSystemUtil.supportsStorageIds(fs), numUnknownDiskIds, stat);
    }

    @Override
    protected HdfsPartition.FileDescriptor getFileDescriptor(FileSystem fs, boolean listWithLocations, Reference<Long> numUnknownDiskIds, FileStatus fileStatus) throws IOException {
        String absPath = null;
        String relPath = FileSystemUtil.relativizePathNoThrow(fileStatus.getPath(), this.partDir_);
        if (relPath == null) {
            if (this.canDataBeOutsideOfTableLocation_) {
                absPath = fileStatus.getPath().toString();
            } else {
                throw new IOException(String.format("Failed to load Iceberg datafile %s, because it's outside of the table location", fileStatus.getPath().toUri()));
            }
        }
        String path = Strings.isNullOrEmpty((String)relPath) ? absPath : relPath;
        HdfsPartition.FileDescriptor fd = (HdfsPartition.FileDescriptor)this.oldFdsByPath_.get((Object)path);
        if (listWithLocations || this.forceRefreshLocations || fd == null || fd.isChanged(fileStatus)) {
            fd = this.createFd(fs, fileStatus, relPath, numUnknownDiskIds, absPath);
            ++this.loadStats_.loadedFiles;
        } else {
            ++this.loadStats_.skippedFiles;
        }
        return fd;
    }

    @Override
    protected List<FileStatus> getFileStatuses(FileSystem fs, boolean listWithLocations) throws IOException {
        if (this.icebergFiles_.isEmpty()) {
            return null;
        }
        Map<Object, Object> nameToFileStatus = Collections.emptyMap();
        if (listWithLocations) {
            nameToFileStatus = this.parallelListing(fs);
        }
        LinkedList stats = Lists.newLinkedList();
        for (ContentFile<?> contentFile : this.icebergFiles_.getAllContentFiles()) {
            Path path = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
            if (nameToFileStatus.containsKey(path)) {
                stats.add(nameToFileStatus.get(path));
                continue;
            }
            FileSystem fsForPath = FileSystemUtil.getFileSystemForPath(path);
            if (FileSystemUtil.supportsStorageIds(fsForPath)) {
                stats.add(FeIcebergTable.Utils.createLocatedFileStatus(path, fsForPath));
                continue;
            }
            stats.add(FeIcebergTable.Utils.createFileStatus(contentFile, path));
        }
        return stats;
    }

    private Map<Path, FileStatus> parallelListing(FileSystem fs) throws IOException {
        String logPrefix = "Parallel Iceberg file metadata listing";
        int poolSize = ParallelFileMetadataLoader.getPoolSize(this.icebergFiles_.size(), fs);
        ExecutorService pool = ParallelFileMetadataLoader.createPool(poolSize, logPrefix);
        ParallelFileMetadataLoader.TOTAL_THREADS.addAndGet(poolSize);
        ConcurrentMap nameToFileStatus = Maps.newConcurrentMap();
        try (ThreadNameAnnotator tna = new ThreadNameAnnotator(logPrefix);){
            Set<Path> partitionPaths = this.icebergFilesByPartition();
            TOTAL_TASKS.addAndGet(partitionPaths.size());
            List tasks = partitionPaths.stream().map(path -> pool.submit(() -> {
                try {
                    Void void_ = this.listingTask(fs, (Path)path, nameToFileStatus);
                    return void_;
                }
                finally {
                    TOTAL_TASKS.decrementAndGet();
                }
            })).collect(Collectors.toList());
            for (Future task : tasks) {
                task.get();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            throw new IOException(String.format("%s: failed to load paths.", logPrefix), e);
        }
        finally {
            ParallelFileMetadataLoader.TOTAL_THREADS.addAndGet(-poolSize);
            pool.shutdown();
        }
        return nameToFileStatus;
    }

    private Set<Path> icebergFilesByPartition() {
        return StreamSupport.stream(this.icebergFiles_.getAllContentFiles().spliterator(), false).map(contentFile -> new Path(String.valueOf(contentFile.path())).getParent()).collect(Collectors.toSet());
    }

    private Void listingTask(FileSystem fs, Path partitionPath, Map<Path, FileStatus> nameToFileStatus) throws IOException {
        RemoteIterator<? extends FileStatus> remoteIterator = FileSystemUtil.listFiles(fs, partitionPath, this.recursive_, this.debugAction_);
        HashMap<Path, FileStatus> perThreadMapping = new HashMap<Path, FileStatus>();
        while (remoteIterator.hasNext()) {
            FileStatus status = (FileStatus)remoteIterator.next();
            perThreadMapping.put(status.getPath(), status);
        }
        nameToFileStatus.putAll(perThreadMapping);
        return null;
    }

    @VisibleForTesting
    boolean shouldReuseOldFds() throws IOException {
        if (this.oldFdsByPath_ == null || this.oldFdsByPath_.isEmpty()) {
            return false;
        }
        if (this.forceRefreshLocations) {
            return false;
        }
        int oldFdsSize = this.oldFdsByPath_.size();
        int iceContentFilesSize = this.icebergFiles_.size();
        if (iceContentFilesSize - oldFdsSize > this.newFilesThreshold_) {
            LOG.trace("There are at least {} new files under path {}.", (Object)(iceContentFilesSize - oldFdsSize), (Object)this.partDir_);
            return false;
        }
        int newFiles = 0;
        for (ContentFile<?> contentFile : this.icebergFiles_.getAllContentFiles()) {
            if (this.getOldFd(contentFile) != null || ++newFiles <= this.newFilesThreshold_) continue;
            LOG.trace("There are at least {} new files under path {}.", (Object)newFiles, (Object)this.partDir_);
            return false;
        }
        LOG.trace("There are only {} new files under path {}.", (Object)newFiles, (Object)this.partDir_);
        return true;
    }

    HdfsPartition.FileDescriptor getOldFd(ContentFile<?> contentFile) throws IOException {
        Path contentFilePath = FileSystemUtil.createFullyQualifiedPath(new Path(contentFile.path().toString()));
        String lookupPath = FileSystemUtil.relativizePathNoThrow(contentFilePath, this.partDir_);
        if (lookupPath == null) {
            if (this.canDataBeOutsideOfTableLocation_) {
                lookupPath = contentFilePath.toString();
            } else {
                throw new IOException(String.format("Failed to load Iceberg datafile %s, because it's outside of the table location", contentFilePath));
            }
        }
        return (HdfsPartition.FileDescriptor)this.oldFdsByPath_.get((Object)lookupPath);
    }
}

