/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.tosfs.object;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.SequenceInputStream;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
import org.apache.hadoop.fs.tosfs.object.Part;
import org.apache.hadoop.fs.tosfs.object.staging.FileStagingPart;
import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
import org.apache.hadoop.fs.tosfs.util.CommonUtils;
import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ObjectOutputStream
extends OutputStream {
    private static final Logger LOG = LoggerFactory.getLogger(ObjectOutputStream.class);
    private final ObjectStorage storage;
    private final ExecutorService uploadPool;
    private long totalWroteSize;
    private final String destKey;
    private final String destScheme;
    private final long multiUploadThreshold;
    private final long byteSizePerPart;
    private final int stagingBufferSize;
    private final boolean allowPut;
    private final List<File> stagingDirs;
    private final List<StagingPart> stagingParts = Lists.newArrayList();
    private final AtomicInteger partNumGetter = new AtomicInteger(0);
    private MultipartUpload multipartUpload = null;
    private final List<CompletableFuture<Part>> results = Lists.newArrayList();
    private StagingPart curPart;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public ObjectOutputStream(ObjectStorage storage, ExecutorService threadPool, Configuration conf, Path dest, boolean allowPut) {
        this.storage = storage;
        this.uploadPool = threadPool;
        this.destScheme = dest.toUri().getScheme();
        this.totalWroteSize = 0L;
        this.destKey = this.createDestKey(dest);
        this.multiUploadThreshold = conf.getLong(ConfKeys.FS_MULTIPART_THRESHOLD.key(this.destScheme), 0xA00000L);
        this.byteSizePerPart = conf.getLong(ConfKeys.FS_MULTIPART_SIZE.key(this.destScheme), 0x800000L);
        this.stagingBufferSize = conf.getInt(ConfKeys.FS_MULTIPART_STAGING_BUFFER_SIZE.key(this.destScheme), 4096);
        this.allowPut = allowPut;
        this.stagingDirs = ObjectOutputStream.createStagingDirs(conf, this.destScheme);
        if (!allowPut) {
            this.multipartUpload = storage.createMultipartUpload(this.destKey);
        }
    }

    private static List<File> createStagingDirs(Configuration conf, String scheme) {
        String[] dirs = conf.getStrings(ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme), new String[]{ConfKeys.FS_MULTIPART_STAGING_DIR_DEFAULT});
        Preconditions.checkArgument((dirs != null && dirs.length > 0 ? 1 : 0) != 0, (String)"'%s' cannot be an empty list", (Object[])new Object[]{ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme)});
        ArrayList<File> stagingDirs = new ArrayList<File>();
        for (String dir : dirs) {
            File stagingDir = new File(dir);
            if (!stagingDir.exists() && stagingDir.mkdirs()) {
                Preconditions.checkArgument((boolean)stagingDir.setWritable(true, false), (String)"Failed to change staging dir permission to writable, please check %s with value %s", (Object[])new Object[]{ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme), dir});
                Preconditions.checkArgument((boolean)stagingDir.setReadable(true, false), (String)"Failed to change staging dir permission to readable, please check %s with value %s", (Object[])new Object[]{ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme), dir});
            } else {
                Preconditions.checkArgument((boolean)stagingDir.exists(), (String)"Failed to create staging dir, please check %s with value %s", (Object[])new Object[]{ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme), dir});
                Preconditions.checkArgument((boolean)stagingDir.isDirectory(), (String)"Staging dir should be a directory, please check %s with value %s", (Object[])new Object[]{ConfKeys.FS_MULTIPART_STAGING_DIR.key(scheme), dir});
            }
            stagingDirs.add(stagingDir);
        }
        return stagingDirs;
    }

    private File chooseStagingDir() {
        return this.stagingDirs.get(ThreadLocalRandom.current().nextInt(this.stagingDirs.size()));
    }

    @Override
    public void write(int b) throws IOException {
        this.write(new byte[]{(byte)b}, 0, 1);
    }

    protected String createDestKey(Path dest) {
        return ObjectUtils.pathToKey(dest);
    }

    @Override
    public synchronized void write(byte[] buf, int off, int len) throws IOException {
        if (len == 0) {
            return;
        }
        Preconditions.checkArgument((off >= 0 && off < buf.length ? 1 : 0) != 0, (String)"Invalid offset - off: %s, len: %s, bufferSize: %s", (Object[])new Object[]{off, len, buf.length});
        Preconditions.checkArgument((len >= 0 && off + len <= buf.length ? 1 : 0) != 0, (String)"Invalid length - off: %s, len: %s, bufferSize: %s", (Object[])new Object[]{off, len, buf.length});
        Preconditions.checkState((!this.closed.get() ? 1 : 0) != 0, (Object)"OutputStream is closed.");
        while (len > 0) {
            if (this.curPart == null) {
                this.curPart = this.newStagingPart();
            }
            Preconditions.checkArgument((this.curPart.size() <= this.byteSizePerPart ? 1 : 0) != 0, (String)"Invalid staging size (%s) which is greater than part size (%s)", (Object[])new Object[]{this.curPart.size(), this.byteSizePerPart});
            int size = (int)Math.min(this.byteSizePerPart - this.curPart.size(), (long)len);
            this.curPart.write(buf, off, size);
            off += size;
            len -= size;
            this.totalWroteSize += (long)size;
            if (this.curPart.size() >= this.byteSizePerPart) {
                this.curPart.complete();
                if (this.multipartUpload != null) {
                    CompletableFuture<Part> result = this.asyncUploadPart(this.curPart, this.partNumGetter.incrementAndGet());
                    this.results.add(result);
                }
                this.curPart = null;
            }
            if (this.multipartUpload != null || this.totalWroteSize < this.multiUploadThreshold) continue;
            this.multipartUpload = this.storage.createMultipartUpload(this.destKey);
            Preconditions.checkState((this.byteSizePerPart >= (long)this.multipartUpload.minPartSize() ? 1 : 0) != 0, (String)"Configured upload part size %s must be greater than or equals to the minimal part size %s, please check configure key %s.", (Object[])new Object[]{this.byteSizePerPart, this.multipartUpload.minPartSize(), ConfKeys.FS_MULTIPART_THRESHOLD.key(this.destScheme)});
            for (StagingPart stagingPart : this.stagingParts) {
                if (stagingPart.size() < this.byteSizePerPart) continue;
                CompletableFuture<Part> result = this.asyncUploadPart(stagingPart, this.partNumGetter.incrementAndGet());
                this.results.add(result);
            }
        }
    }

    private CompletableFuture<Part> asyncUploadPart(StagingPart stagingPart, int partNum) {
        MultipartUpload immutableUpload = this.multipartUpload;
        return CompletableFuture.supplyAsync(() -> this.uploadPart(stagingPart, partNum), this.uploadPool).whenComplete((part, err) -> {
            stagingPart.cleanup();
            if (err != null) {
                LOG.error("Failed to upload part, multipartUpload: {}, partNum: {}, stagingPart: {}", new Object[]{immutableUpload, partNum, stagingPart, err});
            }
        });
    }

    private CompletableFuture<Part> asyncUploadEmptyPart(int partNum) {
        MultipartUpload immutableUpload = this.multipartUpload;
        return CompletableFuture.supplyAsync(() -> this.storage.uploadPart(immutableUpload.key(), immutableUpload.uploadId(), partNum, () -> new ByteArrayInputStream(new byte[0]), 0L), this.uploadPool).whenComplete((part, err) -> {
            if (err != null) {
                LOG.error("Failed to upload empty part, multipartUpload: {}, partNum: {}", new Object[]{immutableUpload, partNum, err});
            }
        });
    }

    private Part uploadPart(StagingPart stagingPart, int partNum) {
        Preconditions.checkNotNull((Object)this.storage, (Object)"Object storage cannot be null.");
        Preconditions.checkNotNull((Object)this.multipartUpload, (Object)"Multipart upload is not initialized.");
        return this.storage.uploadPart(this.multipartUpload.key(), this.multipartUpload.uploadId(), partNum, stagingPart::newIn, stagingPart.size());
    }

    protected void finishUpload(String key, String uploadId, List<Part> parts) throws IOException {
        this.storage.completeUpload(key, uploadId, parts);
    }

    private void simplePut() throws IOException {
        if (this.curPart != null) {
            this.curPart.complete();
        }
        this.storage.put(this.destKey, () -> this.stagingParts().stream().map(StagingPart::newIn).reduce(SequenceInputStream::new).orElseGet(() -> new ByteArrayInputStream(new byte[0])), this.stagingParts().stream().mapToLong(StagingPart::size).sum());
        this.curPart = null;
    }

    synchronized List<Part> waitForPartsUpload() {
        Preconditions.checkArgument((this.multipartUpload != null ? 1 : 0) != 0, (Object)"Multipart upload cannot be null");
        Preconditions.checkArgument((!this.results.isEmpty() ? 1 : 0) != 0, (Object)"Upload parts cannot be empty");
        return this.results.stream().map(CompletableFuture::join).sorted(Comparator.comparing(Part::num)).collect(Collectors.toList());
    }

    @Override
    public synchronized void close() throws IOException {
        if (!this.closed.compareAndSet(false, true)) {
            return;
        }
        try {
            if (this.multipartUpload == null && this.allowPut) {
                this.simplePut();
                return;
            }
            Preconditions.checkNotNull((Object)this.multipartUpload, (Object)"MultipartUpload cannot be null since allowPut was disabled.");
            if (this.totalWroteSize <= 0L) {
                CompletableFuture<Part> result = this.asyncUploadEmptyPart(this.partNumGetter.incrementAndGet());
                this.results.add(result);
            } else if (this.curPart != null) {
                this.curPart.complete();
                CompletableFuture<Part> result = this.asyncUploadPart(this.curPart, this.partNumGetter.incrementAndGet());
                this.results.add(result);
                this.curPart = null;
            }
            this.finishUpload(this.multipartUpload.key(), this.multipartUpload.uploadId(), this.waitForPartsUpload());
        }
        catch (Exception e) {
            LOG.error("Encountering error when closing output stream", (Throwable)e);
            if (this.multipartUpload != null) {
                CommonUtils.runQuietly(() -> this.storage.abortMultipartUpload(this.multipartUpload.key(), this.multipartUpload.uploadId()));
            }
            throw e;
        }
        finally {
            this.deleteStagingPart(this.stagingParts);
        }
    }

    public long totalWroteSize() {
        return this.totalWroteSize;
    }

    public ObjectStorage storage() {
        return this.storage;
    }

    public List<StagingPart> stagingParts() {
        return this.stagingParts;
    }

    public String destKey() {
        return this.destKey;
    }

    public MultipartUpload upload() {
        return this.multipartUpload;
    }

    private void deleteStagingPart(List<StagingPart> parts) {
        for (StagingPart part : parts) {
            part.cleanup();
        }
    }

    private StagingPart newStagingPart() {
        String stagingPath = String.format("%s/staging-%s.tmp", this.chooseStagingDir(), UUIDUtils.random());
        FileStagingPart part = new FileStagingPart(stagingPath, this.stagingBufferSize);
        this.stagingParts.add(part);
        return part;
    }
}

