package org.apache.flink.runtime.filecache;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache.class */
public class FileCache {
    private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
    private final Object lock;
    private final Map<JobID, Map<String, Future<Path>>> entries;
    private final Map<JobID, Set<ExecutionAttemptID>> jobRefHolders;
    private final ScheduledExecutorService executorService;
    private final File[] storageDirectories;
    private final Thread shutdownHook;
    private int nextDirectory;
    private final PermanentBlobService blobService;
    private final long cleanupInterval;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache$CopyFromBlobProcess.class */
    public static class CopyFromBlobProcess implements Callable<Path> {
        private final PermanentBlobKey blobKey;
        private final Path target;
        private final boolean isDirectory;
        private final boolean isExecutable;
        private final JobID jobID;
        private final PermanentBlobService blobService;

        CopyFromBlobProcess(DistributedCache.DistributedCacheEntry distributedCacheEntry, JobID jobID, PermanentBlobService permanentBlobService, Path path) throws Exception {
            this.isExecutable = distributedCacheEntry.isExecutable.booleanValue();
            this.isDirectory = distributedCacheEntry.isZipped;
            this.jobID = jobID;
            this.blobService = permanentBlobService;
            this.blobKey = (PermanentBlobKey) InstantiationUtil.deserializeObject(distributedCacheEntry.blobKey, Thread.currentThread().getContextClassLoader());
            this.target = path;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Path call() throws IOException {
            File file = this.blobService.getFile(this.jobID, this.blobKey);
            if (this.isDirectory) {
                return FileUtils.expandDirectory(new Path(file.getAbsolutePath()), this.target);
            }
            file.setExecutable(this.isExecutable);
            return Path.fromLocalFile(file);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache$CopyFromDFSProcess.class */
    public static class CopyFromDFSProcess implements Callable<Path> {
        private final Path filePath;
        private final Path cachedPath;
        private final boolean executable;
        private final boolean isZipped;

        public CopyFromDFSProcess(DistributedCache.DistributedCacheEntry distributedCacheEntry, Path path) {
            this.filePath = new Path(distributedCacheEntry.filePath);
            this.executable = distributedCacheEntry.isExecutable.booleanValue();
            this.isZipped = distributedCacheEntry.isZipped;
            String str = distributedCacheEntry.filePath;
            int lastIndexOf = str.lastIndexOf("/");
            this.cachedPath = new Path(path, lastIndexOf > 0 ? str.substring(lastIndexOf + 1) : str);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Path call() throws IOException {
            FileUtils.copy(this.filePath, this.cachedPath, this.executable);
            return this.isZipped ? FileUtils.expandDirectory(this.cachedPath, this.cachedPath.getParent()) : this.cachedPath;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/filecache/FileCache$DeleteProcess.class */
    public class DeleteProcess implements Runnable {
        private final JobID jobID;

        DeleteProcess(JobID jobID) {
            this.jobID = jobID;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                synchronized (FileCache.this.lock) {
                    Set set = (Set) FileCache.this.jobRefHolders.get(this.jobID);
                    if (set != null && set.isEmpty()) {
                        Iterator it = ((Map) FileCache.this.entries.get(this.jobID)).values().iterator();
                        while (it.hasNext()) {
                            ((Future) it.next()).cancel(true);
                        }
                        FileCache.this.entries.remove(this.jobID);
                        FileCache.this.jobRefHolders.remove(this.jobID);
                        for (File file : FileCache.this.storageDirectories) {
                            FileUtils.deleteDirectory(new File(file, this.jobID.toString()));
                        }
                    }
                }
            } catch (IOException e) {
                FileCache.LOG.error("Could not delete file from local file cache.", e);
            }
        }
    }

    public FileCache(String[] strArr, PermanentBlobService permanentBlobService) throws IOException {
        this(strArr, permanentBlobService, Executors.newScheduledThreadPool(10, new ExecutorThreadFactory("flink-file-cache")), ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
    }

    @VisibleForTesting
    FileCache(String[] strArr, PermanentBlobService permanentBlobService, ScheduledExecutorService scheduledExecutorService, long j) throws IOException {
        this.lock = new Object();
        Preconditions.checkNotNull(strArr);
        this.cleanupInterval = j;
        this.storageDirectories = new File[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            this.storageDirectories[i] = new File(strArr[i], "flink-dist-cache-" + UUID.randomUUID().toString());
            String absolutePath = this.storageDirectories[i].getAbsolutePath();
            if (!this.storageDirectories[i].mkdirs()) {
                LOG.error("User file cache cannot create directory " + absolutePath);
                for (int i2 = 0; i2 < i; i2++) {
                    if (!this.storageDirectories[i2].delete()) {
                        LOG.warn("User file cache cannot remove prior directory " + this.storageDirectories[i2].getAbsolutePath());
                    }
                }
                throw new IOException("File cache cannot create temp storage directory: " + absolutePath);
            }
            LOG.info("User file cache uses directory " + absolutePath);
        }
        this.shutdownHook = createShutdownHook(this, LOG);
        this.entries = new HashMap();
        this.jobRefHolders = new HashMap();
        this.executorService = scheduledExecutorService;
        this.blobService = permanentBlobService;
    }

    public void shutdown() {
        synchronized (this.lock) {
            ScheduledExecutorService scheduledExecutorService = this.executorService;
            if (scheduledExecutorService != null) {
                scheduledExecutorService.shutdown();
                try {
                    scheduledExecutorService.awaitTermination(this.cleanupInterval, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                }
            }
            this.entries.clear();
            this.jobRefHolders.clear();
            for (File file : this.storageDirectories) {
                try {
                    FileUtils.deleteDirectory(file);
                    LOG.info("removed file cache directory {}", file.getAbsolutePath());
                } catch (IOException e2) {
                    LOG.error("File cache could not properly clean up storage directory: {}", file.getAbsolutePath(), e2);
                }
            }
            ShutdownHookUtil.removeShutdownHook(this.shutdownHook, getClass().getSimpleName(), LOG);
        }
    }

    public Future<Path> createTmpFile(String str, DistributedCache.DistributedCacheEntry distributedCacheEntry, JobID jobID, ExecutionAttemptID executionAttemptID) throws Exception {
        synchronized (this.lock) {
            Map computeIfAbsent = this.entries.computeIfAbsent(jobID, jobID2 -> {
                return new HashMap();
            });
            this.jobRefHolders.computeIfAbsent(jobID, jobID3 -> {
                return new HashSet();
            }).add(executionAttemptID);
            Future<Path> future = (Future) computeIfAbsent.get(str);
            if (future != null) {
                return future;
            }
            File[] fileArr = this.storageDirectories;
            int i = this.nextDirectory;
            this.nextDirectory = i + 1;
            File file = new File(fileArr[i], jobID.toString());
            if (this.nextDirectory >= this.storageDirectories.length) {
                this.nextDirectory = 0;
            }
            FutureTask futureTask = new FutureTask(distributedCacheEntry.blobKey != null ? new CopyFromBlobProcess(distributedCacheEntry, jobID, this.blobService, new Path(file.getAbsolutePath())) : new CopyFromDFSProcess(distributedCacheEntry, new Path(file.getAbsolutePath())));
            this.executorService.submit(futureTask);
            computeIfAbsent.put(str, futureTask);
            return futureTask;
        }
    }

    private static Thread createShutdownHook(FileCache fileCache, Logger logger) {
        fileCache.getClass();
        return ShutdownHookUtil.addShutdownHook(fileCache::shutdown, FileCache.class.getSimpleName(), logger);
    }

    public void releaseJob(JobID jobID, ExecutionAttemptID executionAttemptID) {
        Preconditions.checkNotNull(jobID);
        synchronized (this.lock) {
            Set<ExecutionAttemptID> set = this.jobRefHolders.get(jobID);
            if (set == null || set.isEmpty()) {
                return;
            }
            set.remove(executionAttemptID);
            if (set.isEmpty()) {
                this.executorService.schedule(new DeleteProcess(jobID), this.cleanupInterval, TimeUnit.MILLISECONDS);
            }
        }
    }
}
