package org.apache.hadoop.fs.obs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.obs.services.exception.ObsException;
import com.obs.services.model.CompleteMultipartUploadResult;
import com.obs.services.model.PartEtag;
import com.obs.services.model.PutObjectRequest;
import com.obs.services.model.UploadPartRequest;
import com.obs.services.model.UploadPartResult;
import com.sun.istack.NotNull;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.obs.BasicMetricsConsumer;
import org.apache.hadoop.fs.obs.OBSDataBlocks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
@InterfaceAudience.Private
@InterfaceStability.Unstable
/* loaded from: input_file:org/apache/hadoop/fs/obs/OBSBlockOutputStream.class */
public class OBSBlockOutputStream extends OutputStream implements Syncable {
    private static final Logger LOG = LoggerFactory.getLogger(OBSBlockOutputStream.class);
    private final OBSFileSystem fs;
    private final String key;
    private final String uri;
    private long objectLen;
    private final int blockSize;
    private final ListeningExecutorService executorService;
    private final OBSDataBlocks.BlockFactory blockFactory;
    private final AtomicBoolean appendAble;
    private MultiPartUpload multiPartUpload;
    private OBSDataBlocks.DataBlock activeBlock;
    private OBSWriteOperationHelper writeOperationHelper;
    private String hflushPolicy;
    private final byte[] singleCharWrite = new byte[1];
    private volatile boolean closed = false;
    private final AtomicBoolean hasException = new AtomicBoolean(false);
    private long blockCount = 0;
    private boolean mockUploadPartError = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/fs/obs/OBSBlockOutputStream$MultiPartUpload.class */
    public class MultiPartUpload {
        private final String uploadId;
        private final List<ListenableFuture<Pair<PartEtag, Integer>>> partETagsFutures = new ArrayList(2);

        MultiPartUpload() throws IOException {
            this.uploadId = OBSBlockOutputStream.this.writeOperationHelper.initiateMultiPartUpload(OBSBlockOutputStream.this.key);
            OBSBlockOutputStream.LOG.debug("Initiated multi-part upload for {} with , the key is {}id '{}'", new Object[]{OBSBlockOutputStream.this.writeOperationHelper, this.uploadId, OBSBlockOutputStream.this.key});
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void uploadBlockAsync(OBSDataBlocks.DataBlock dataBlock) throws IOException {
            OBSBlockOutputStream.LOG.debug("Queueing upload of {}", dataBlock);
            int dataSize = dataBlock.dataSize();
            int size = this.partETagsFutures.size() + 1;
            UploadPartRequest newUploadPartRequest = dataBlock instanceof OBSDataBlocks.DiskBlock ? OBSBlockOutputStream.this.writeOperationHelper.newUploadPartRequest(OBSBlockOutputStream.this.key, this.uploadId, size, dataSize, (File) dataBlock.startUpload()) : OBSBlockOutputStream.this.writeOperationHelper.newUploadPartRequest(OBSBlockOutputStream.this.key, this.uploadId, size, dataSize, (InputStream) dataBlock.startUpload());
            this.partETagsFutures.add(OBSBlockOutputStream.this.executorService.submit(() -> {
                OBSBlockOutputStream.LOG.debug("Uploading part {} for id '{}'", Integer.valueOf(size), this.uploadId);
                try {
                    try {
                        if (OBSBlockOutputStream.this.mockUploadPartError) {
                            throw new ObsException("mock upload part error");
                        }
                        UploadPartResult uploadPart = OBSCommonUtils.uploadPart(OBSBlockOutputStream.this.fs, newUploadPartRequest);
                        PartEtag partEtag = new PartEtag(uploadPart.getEtag(), Integer.valueOf(uploadPart.getPartNumber()));
                        if (OBSBlockOutputStream.LOG.isDebugEnabled()) {
                            OBSBlockOutputStream.LOG.debug("Completed upload of {} to part {}", dataBlock, partEtag);
                        }
                        OBSCommonUtils.closeAll(dataBlock);
                        return new Pair(partEtag, Integer.valueOf(dataSize));
                    } catch (ObsException e) {
                        OBSBlockOutputStream.this.hasException.set(true);
                        IOException translateException = OBSCommonUtils.translateException("UploadPart", OBSBlockOutputStream.this.key, e);
                        OBSBlockOutputStream.LOG.error("UploadPart failed (ObsException). {}", translateException.getMessage());
                        throw translateException;
                    }
                } catch (Throwable th) {
                    OBSCommonUtils.closeAll(dataBlock);
                    throw th;
                }
            }));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<Pair<PartEtag, Integer>> waitForAllPartUploads() throws IOException {
            OBSBlockOutputStream.LOG.debug("Waiting for {} uploads to complete", Integer.valueOf(this.partETagsFutures.size()));
            try {
                return (List) Futures.allAsList(this.partETagsFutures).get();
            } catch (InterruptedException e) {
                OBSBlockOutputStream.LOG.warn("Interrupted partUpload", e);
                OBSBlockOutputStream.LOG.debug("Cancelling futures");
                Iterator<ListenableFuture<Pair<PartEtag, Integer>>> it = this.partETagsFutures.iterator();
                while (it.hasNext()) {
                    it.next().cancel(true);
                }
                abort();
                throw new IOException("Interrupted multi-part upload with id '" + this.uploadId + "' to " + OBSBlockOutputStream.this.key);
            } catch (ExecutionException e2) {
                OBSBlockOutputStream.LOG.debug("While waiting for upload completion", e2);
                OBSBlockOutputStream.LOG.debug("Cancelling futures");
                Iterator<ListenableFuture<Pair<PartEtag, Integer>>> it2 = this.partETagsFutures.iterator();
                while (it2.hasNext()) {
                    it2.next().cancel(true);
                }
                abort();
                throw OBSCommonUtils.extractException("Multi-part upload with id '" + this.uploadId + "' to " + OBSBlockOutputStream.this.key, OBSBlockOutputStream.this.key, e2);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompleteMultipartUploadResult complete(List<PartEtag> list) throws IOException {
            String format = String.format("Completing multi-part upload for key '%s', id '%s' with %s partitions ", OBSBlockOutputStream.this.key, this.uploadId, Integer.valueOf(list.size()));
            try {
                OBSBlockOutputStream.LOG.debug(format);
                return OBSBlockOutputStream.this.writeOperationHelper.completeMultipartUpload(OBSBlockOutputStream.this.key, this.uploadId, list);
            } catch (ObsException e) {
                throw OBSCommonUtils.translateException(format, OBSBlockOutputStream.this.key, e);
            }
        }

        void abort() {
            try {
                OBSBlockOutputStream.LOG.debug(String.format("Aborting multi-part upload for '%s', id '%s", OBSBlockOutputStream.this.writeOperationHelper, this.uploadId));
                OBSBlockOutputStream.this.writeOperationHelper.abortMultipartUpload(OBSBlockOutputStream.this.key, this.uploadId);
            } catch (ObsException e) {
                OBSBlockOutputStream.LOG.warn("Unable to abort multipart upload, you may need to purge  uploaded parts", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OBSBlockOutputStream(OBSFileSystem oBSFileSystem, String str, long j, ExecutorService executorService, boolean z) throws IOException {
        this.hflushPolicy = "sync";
        this.appendAble = new AtomicBoolean(z);
        this.fs = oBSFileSystem;
        this.key = str;
        this.uri = "obs://" + this.fs.getBucket() + "/" + this.key;
        this.objectLen = j;
        this.blockFactory = oBSFileSystem.getBlockFactory();
        this.blockSize = (int) oBSFileSystem.getPartSize();
        this.writeOperationHelper = oBSFileSystem.getWriteHelper();
        Preconditions.checkArgument(oBSFileSystem.getPartSize() >= 5242880, "Block size is too small: %d", new Object[]{Long.valueOf(oBSFileSystem.getPartSize())});
        this.executorService = MoreExecutors.listeningDecorator(executorService);
        this.multiPartUpload = null;
        this.hflushPolicy = oBSFileSystem.getConf().get("fs.obs.outputstream.hflush.policy", "sync");
        createBlockIfNeeded();
        LOG.debug("Initialized OBSBlockOutputStream for {} output to {}", oBSFileSystem.getWriteHelper(), this.activeBlock);
    }

    private synchronized OBSDataBlocks.DataBlock createBlockIfNeeded() throws IOException {
        if (this.activeBlock == null) {
            this.blockCount++;
            if (this.blockCount >= 10000) {
                LOG.debug("Number of partitions in stream exceeds limit for OBS: 10000 write may fail.");
            }
            this.activeBlock = this.blockFactory.create(this.blockCount, this.blockSize);
        }
        return this.activeBlock;
    }

    synchronized OBSDataBlocks.DataBlock getActiveBlock() {
        return this.activeBlock;
    }

    @VisibleForTesting
    void mockPutPartError(boolean z) {
        this.mockUploadPartError = z;
    }

    private synchronized boolean hasActiveBlock() {
        return this.activeBlock != null;
    }

    private synchronized void clearActiveBlock() {
        if (this.activeBlock != null) {
            LOG.debug("Clearing active block");
        }
        this.activeBlock = null;
    }

    private void checkStreamOpen() throws IOException {
        if (this.closed) {
            throw new IOException(this.uri + ": Stream is closed!");
        }
    }

    @Override // java.io.OutputStream, java.io.Flushable
    public synchronized void flush() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        OBSDataBlocks.DataBlock activeBlock = getActiveBlock();
        if (activeBlock != null) {
            activeBlock.flush();
        }
    }

    @Override // java.io.OutputStream
    public synchronized void write(int i) throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        this.singleCharWrite[0] = (byte) i;
        write(this.singleCharWrite, 0, 1);
    }

    @Override // java.io.OutputStream
    public synchronized void write(@NotNull byte[] bArr, int i, int i2) throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        long currentTimeMillis = System.currentTimeMillis();
        if (this.hasException.get()) {
            String format = String.format("write has error. bs : pre upload obs[%s] has error.", this.key);
            LOG.warn(format);
            throw new IOException(format);
        }
        OBSDataBlocks.validateWriteArgs(bArr, i, i2);
        if (i2 == 0) {
            return;
        }
        OBSDataBlocks.DataBlock createBlockIfNeeded = createBlockIfNeeded();
        try {
            innerWrite(bArr, i, i2, createBlockIfNeeded.write(bArr, i, i2), createBlockIfNeeded.remainingCapacity());
            long currentTimeMillis2 = System.currentTimeMillis();
            if (this.fs.getMetricSwitch()) {
                OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord(null, "write", true, currentTimeMillis2 - currentTimeMillis));
            }
        } catch (IOException e) {
            long currentTimeMillis3 = System.currentTimeMillis();
            if (this.fs.getMetricSwitch()) {
                OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord(null, "write", false, currentTimeMillis3 - currentTimeMillis));
            }
            LOG.error("Write data for key {} of bucket {} error, error message {}", new Object[]{this.key, this.fs.getBucket(), e.getMessage()});
            throw e;
        }
    }

    private synchronized void innerWrite(byte[] bArr, int i, int i2, int i3, int i4) throws IOException {
        if (i3 < i2) {
            LOG.debug("writing more data than block has capacity -triggering upload");
            if (this.appendAble.get()) {
                LOG.debug("[Append] open stream and single write size {} greater than buffer size {}, append buffer to obs.", Integer.valueOf(i2), Integer.valueOf(this.blockSize));
                flushCurrentBlock();
            } else {
                uploadCurrentBlock();
            }
            write(bArr, i + i3, i2 - i3);
            return;
        }
        if (i4 == 0) {
            if (!this.appendAble.get()) {
                uploadCurrentBlock();
            } else {
                LOG.debug("[Append] open stream and already write size equal to buffer size {}, append buffer to obs.", Integer.valueOf(this.blockSize));
                flushCurrentBlock();
            }
        }
    }

    private synchronized void uploadCurrentBlock() throws IOException {
        Preconditions.checkState(hasActiveBlock(), "No active block");
        LOG.debug("Writing block # {}", Long.valueOf(this.blockCount));
        try {
            try {
                if (this.multiPartUpload == null) {
                    LOG.debug("Initiating Multipart upload");
                    this.multiPartUpload = new MultiPartUpload();
                }
                this.multiPartUpload.uploadBlockAsync(getActiveBlock());
                clearActiveBlock();
            } catch (IOException e) {
                this.hasException.set(true);
                LOG.error("Upload current block on ({}/{}) failed.", new Object[]{this.fs.getBucket(), this.key, e});
                throw e;
            }
        } catch (Throwable th) {
            clearActiveBlock();
            throw th;
        }
    }

    @Override // java.io.OutputStream, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.closed) {
            LOG.debug("Ignoring close() as stream is already closed");
            return;
        }
        if (this.hasException.get()) {
            String format = String.format("closed has error. bs : pre write obs[%s] has error.", this.key);
            LOG.warn(format);
            long currentTimeMillis2 = System.currentTimeMillis();
            if (this.fs.getMetricSwitch()) {
                OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord("output", BasicMetricsConsumer.MetricRecord.CLOSE, false, currentTimeMillis2 - currentTimeMillis));
            }
            throw new IOException(format);
        }
        this.fs.checkOpen();
        completeCurrentBlock();
        clearHFlushOrSync();
        this.writeOperationHelper.writeSuccessful(this.key);
        this.fs.removeFileBeingWritten(this.key);
        long currentTimeMillis3 = System.currentTimeMillis();
        if (this.fs.getMetricSwitch()) {
            OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord("output", BasicMetricsConsumer.MetricRecord.CLOSE, true, currentTimeMillis3 - currentTimeMillis));
        }
        this.closed = true;
    }

    private synchronized void putObjectIfNeedAppend() throws IOException {
        if (this.appendAble.get() && this.fs.exists(OBSCommonUtils.keyToQualifiedPath(this.fs, this.key))) {
            appendFsFile();
        } else {
            putObject();
        }
    }

    private synchronized void appendFsFile() throws IOException {
        LOG.debug("bucket is posix, to append file. key is {}", this.key);
        OBSDataBlocks.DataBlock activeBlock = getActiveBlock();
        OBSCommonUtils.appendFile(this.fs, activeBlock instanceof OBSDataBlocks.DiskBlock ? OBSCommonUtils.newAppendFileRequest(this.fs, this.key, this.objectLen, (File) activeBlock.startUpload()) : OBSCommonUtils.newAppendFileRequest(this.fs, this.key, this.objectLen, (InputStream) activeBlock.startUpload()));
        this.objectLen += activeBlock.dataSize();
    }

    private synchronized void putObject() throws IOException {
        LOG.debug("Executing regular upload for {}", this.writeOperationHelper.toString(this.key));
        OBSDataBlocks.DataBlock activeBlock = getActiveBlock();
        clearActiveBlock();
        int dataSize = activeBlock.dataSize();
        PutObjectRequest newPutRequest = activeBlock instanceof OBSDataBlocks.DiskBlock ? this.writeOperationHelper.newPutRequest(this.key, (File) activeBlock.startUpload()) : this.writeOperationHelper.newPutRequest(this.key, (InputStream) activeBlock.startUpload(), dataSize);
        newPutRequest.setAcl(this.fs.getCannedACL());
        this.fs.getSchemeStatistics().incrementWriteOps(1);
        try {
            this.writeOperationHelper.putObject(newPutRequest);
            this.objectLen += dataSize;
            OBSCommonUtils.closeAll(activeBlock);
        } catch (Throwable th) {
            OBSCommonUtils.closeAll(activeBlock);
            throw th;
        }
    }

    public synchronized String toString() {
        StringBuilder sb = new StringBuilder("OBSBlockOutputStream{");
        sb.append(this.writeOperationHelper.toString());
        sb.append(", blockSize=").append(this.blockSize);
        OBSDataBlocks.DataBlock dataBlock = this.activeBlock;
        if (dataBlock != null) {
            sb.append(", activeBlock=").append(dataBlock);
        }
        sb.append('}');
        return sb.toString();
    }

    public synchronized void sync() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
    }

    public synchronized void hflush() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        long currentTimeMillis = System.currentTimeMillis();
        String str = this.hflushPolicy;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3545755:
                if (str.equals("sync")) {
                    z = false;
                    break;
                }
                break;
            case 96634189:
                if (str.equals("empty")) {
                    z = 2;
                    break;
                }
                break;
            case 97532676:
                if (str.equals("flush")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                flushOrSync();
                break;
            case true:
                flush();
                break;
            case true:
                break;
            default:
                throw new IOException(String.format("unsupported downgrade policy '%s'", this.hflushPolicy));
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.fs.getMetricSwitch()) {
            OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord(null, "hflush", true, currentTimeMillis2 - currentTimeMillis));
        }
    }

    private synchronized void flushOrSync() throws IOException {
        checkStreamOpen();
        if (this.hasException.get()) {
            String format = String.format("flushOrSync has error. bs : pre write obs[%s] has error.", this.key);
            LOG.warn(format);
            throw new IOException(format);
        }
        if (this.fs.isFsBucket()) {
            flushCurrentBlock();
            clearHFlushOrSync();
        } else {
            LOG.warn("not posix bucket, not support hflush or hsync.");
            flush();
        }
    }

    private synchronized void clearHFlushOrSync() {
        this.appendAble.set(true);
        this.multiPartUpload = null;
    }

    private synchronized void uploadWriteBlocks(OBSDataBlocks.DataBlock dataBlock, boolean z) throws IOException {
        if (this.multiPartUpload != null) {
            if (z && dataBlock.hasData()) {
                uploadCurrentBlock();
            }
            List<Pair> waitForAllPartUploads = this.multiPartUpload.waitForAllPartUploads();
            ArrayList arrayList = new ArrayList();
            int i = 0;
            for (Pair pair : waitForAllPartUploads) {
                arrayList.add(pair.getKey());
                i += ((Integer) pair.getValue()).intValue();
            }
            this.multiPartUpload.complete(arrayList);
            this.objectLen = i;
        } else if (z) {
            putObjectIfNeedAppend();
        }
        LOG.debug("Upload complete for {}", this.writeOperationHelper.toString(this.key));
    }

    /* JADX WARN: Finally extract failed */
    private synchronized void completeCurrentBlock() throws IOException {
        OBSDataBlocks.DataBlock activeBlock = getActiveBlock();
        boolean hasActiveBlock = hasActiveBlock();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this;
        objArr[1] = Long.valueOf(this.blockCount);
        objArr[2] = hasActiveBlock ? activeBlock : "(none)";
        logger.debug("{}: complete block #{}: current block= {}", objArr);
        try {
            try {
                try {
                    uploadWriteBlocks(activeBlock, hasActiveBlock);
                    OBSCommonUtils.closeAll(activeBlock);
                    clearActiveBlock();
                } catch (Exception e) {
                    LOG.error("Upload data to obs error. other exception : {}", e.getMessage());
                    throw e;
                }
            } catch (IOException e2) {
                LOG.error("Upload data to obs error. io exception : {}", e2.getMessage());
                throw e2;
            }
        } catch (Throwable th) {
            OBSCommonUtils.closeAll(activeBlock);
            clearActiveBlock();
            throw th;
        }
    }

    private synchronized void flushCurrentBlock() throws IOException {
        OBSDataBlocks.DataBlock activeBlock = getActiveBlock();
        boolean hasActiveBlock = hasActiveBlock();
        Logger logger = LOG;
        Object[] objArr = new Object[3];
        objArr[0] = this;
        objArr[1] = Long.valueOf(this.blockCount);
        objArr[2] = hasActiveBlock ? activeBlock : "(none)";
        logger.debug("{}: complete block #{}: current block= {}", objArr);
        try {
            try {
                try {
                    uploadWriteBlocks(activeBlock, hasActiveBlock);
                    OBSCommonUtils.closeAll(activeBlock);
                    clearActiveBlock();
                } catch (Exception e) {
                    LOG.error("hflush data to obs error. other exception : {}", e.getMessage());
                    this.hasException.set(true);
                    throw e;
                }
            } catch (IOException e2) {
                LOG.error("hflush data to obs error. io exception : {}", e2.getMessage());
                this.hasException.set(true);
                throw e2;
            }
        } catch (Throwable th) {
            OBSCommonUtils.closeAll(activeBlock);
            clearActiveBlock();
            throw th;
        }
    }

    public synchronized void hsync() throws IOException {
        this.fs.checkOpen();
        checkStreamOpen();
        long currentTimeMillis = System.currentTimeMillis();
        String str = this.hflushPolicy;
        boolean z = -1;
        switch (str.hashCode()) {
            case 3545755:
                if (str.equals("sync")) {
                    z = false;
                    break;
                }
                break;
            case 96634189:
                if (str.equals("empty")) {
                    z = 2;
                    break;
                }
                break;
            case 97532676:
                if (str.equals("flush")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                flushOrSync();
                break;
            case true:
                sync();
                break;
            case true:
                break;
            default:
                throw new IOException(String.format("unsupported downgrade policy '%s'", this.hflushPolicy));
        }
        long currentTimeMillis2 = System.currentTimeMillis();
        if (this.fs.getMetricSwitch()) {
            OBSCommonUtils.setMetricsInfo(this.fs, new BasicMetricsConsumer.MetricRecord(null, "hflush", true, currentTimeMillis2 - currentTimeMillis));
        }
    }
}
