package alluxio.client.block.stream;

import alluxio.client.WriteType;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.grpc.CreateLocalBlockRequest;
import alluxio.grpc.CreateLocalBlockResponse;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.io.Closer;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerNetAddress;
import alluxio.worker.block.io.LocalFileBlockWriter;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/LocalFileDataWriter.class */
public final class LocalFileDataWriter implements DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileDataWriter.class);
    private final long mFileBufferBytes;
    private final long mDataTimeoutMs;
    private final LocalFileBlockWriter mWriter;
    private final long mChunkSize;
    private final CreateLocalBlockRequest mCreateRequest;
    private final Closer mCloser;
    private final GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse> mStream;
    private long mPos;
    private long mPosReserved;

    public static LocalFileDataWriter create(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, OutStreamOptions outStreamOptions) throws IOException {
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        long bytes = clusterConf.getBytes(PropertyKey.USER_LOCAL_WRITER_CHUNK_SIZE_BYTES);
        Closer create = Closer.create();
        try {
            CloseableResource<BlockWorkerClient> acquireBlockWorkerClient = fileSystemContext.acquireBlockWorkerClient(workerNetAddress);
            create.register(acquireBlockWorkerClient);
            int i = clusterConf.getInt(PropertyKey.USER_STREAMING_WRITER_BUFFER_SIZE_MESSAGES);
            long bytes2 = clusterConf.getBytes(PropertyKey.USER_FILE_BUFFER_BYTES);
            long ms = clusterConf.getMs(PropertyKey.USER_STREAMING_DATA_WRITE_TIMEOUT);
            CreateLocalBlockRequest.Builder pinOnCreate = CreateLocalBlockRequest.newBuilder().setBlockId(j).setTier(outStreamOptions.getWriteTier()).setSpaceToReserve(Math.min(j2, clusterConf.getBytes(PropertyKey.USER_FILE_RESERVED_BYTES))).setMediumType(outStreamOptions.getMediumType()).setPinOnCreate(outStreamOptions.getWriteType() == WriteType.ASYNC_THROUGH);
            if (outStreamOptions.getWriteType() == WriteType.ASYNC_THROUGH && clusterConf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED)) {
                pinOnCreate.setCleanupOnFailure(false);
            }
            CreateLocalBlockRequest build = pinOnCreate.build();
            BlockWorkerClient blockWorkerClient = acquireBlockWorkerClient.get();
            blockWorkerClient.getClass();
            GrpcBlockingStream grpcBlockingStream = new GrpcBlockingStream(blockWorkerClient::createLocalBlock, i, MoreObjects.toStringHelper((Class<?>) LocalFileDataWriter.class).add("request", build).add("address", workerNetAddress).toString());
            grpcBlockingStream.send(build, ms);
            CreateLocalBlockResponse createLocalBlockResponse = (CreateLocalBlockResponse) grpcBlockingStream.receive(ms);
            Preconditions.checkState(createLocalBlockResponse != null && createLocalBlockResponse.hasPath());
            return new LocalFileDataWriter(bytes, (LocalFileBlockWriter) create.register(new LocalFileBlockWriter(createLocalBlockResponse.getPath())), build, grpcBlockingStream, create, bytes2, ms);
        } catch (Exception e) {
            throw CommonUtils.closeAndRethrow(create, e);
        }
    }

    @Override // alluxio.client.block.stream.DataWriter
    public long pos() {
        return this.mPos;
    }

    @Override // alluxio.client.block.stream.DataWriter
    public int chunkSize() {
        return (int) this.mChunkSize;
    }

    @Override // alluxio.client.block.stream.DataWriter
    public void writeChunk(ByteBuf byteBuf) throws IOException {
        try {
            Preconditions.checkState((this.mStream.isCanceled() || this.mStream.isClosed()) ? false : true, "DataWriter is closed while writing chunks.");
            int readableBytes = byteBuf.readableBytes();
            ensureReserved(this.mPos + readableBytes);
            this.mPos += readableBytes;
            Preconditions.checkState(this.mWriter.append(byteBuf) == ((long) readableBytes));
            MetricsSystem.counter(MetricKey.CLIENT_BYTES_WRITTEN_LOCAL.getName()).inc(readableBytes);
            MetricsSystem.meter(MetricKey.CLIENT_BYTES_WRITTEN_LOCAL_THROUGHPUT.getName()).mark(readableBytes);
        } finally {
            byteBuf.release();
        }
    }

    @Override // alluxio.client.Cancelable
    public void cancel() throws IOException {
        this.mCloser.register(() -> {
            this.mStream.cancel();
        });
        this.mCloser.close();
    }

    @Override // alluxio.client.block.stream.DataWriter
    public void flush() {
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.mCloser.register(() -> {
            this.mStream.close();
            this.mStream.waitForComplete(this.mDataTimeoutMs);
        });
        this.mCloser.close();
    }

    private LocalFileDataWriter(long j, LocalFileBlockWriter localFileBlockWriter, CreateLocalBlockRequest createLocalBlockRequest, GrpcBlockingStream<CreateLocalBlockRequest, CreateLocalBlockResponse> grpcBlockingStream, Closer closer, long j2, long j3) {
        this.mFileBufferBytes = j2;
        this.mDataTimeoutMs = j3;
        this.mCloser = closer;
        this.mWriter = localFileBlockWriter;
        this.mCreateRequest = createLocalBlockRequest;
        this.mStream = grpcBlockingStream;
        this.mPosReserved = createLocalBlockRequest.getSpaceToReserve();
        this.mChunkSize = j;
    }

    private void ensureReserved(long j) throws IOException {
        if (j <= this.mPosReserved) {
            return;
        }
        long max = Math.max(j - this.mPosReserved, this.mFileBufferBytes);
        CreateLocalBlockRequest build = this.mCreateRequest.toBuilder().setSpaceToReserve(max).setOnlyReserveSpace(true).build();
        this.mStream.send(build, this.mDataTimeoutMs);
        CreateLocalBlockResponse receive = this.mStream.receive(this.mDataTimeoutMs);
        Preconditions.checkState(receive != null, String.format("Stream closed while waiting for reserve request %s", build.toString()));
        Preconditions.checkState(!receive.hasPath(), String.format("Invalid response for reserve request %s", build.toString()));
        this.mPosReserved += max;
    }
}
