package org.apache.flink.connector.file.table.stream.compact;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.util.function.SupplierWithException;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/compact/CompactBucketWriter.class */
public class CompactBucketWriter<T> implements CompactWriter<T> {
    private final BucketWriter<T, String> bucketWriter;
    private final InProgressFileWriter<T, String> writer;

    /* loaded from: input_file:org/apache/flink/connector/file/table/stream/compact/CompactBucketWriter$Factory.class */
    private static class Factory<T> implements CompactWriter.Factory<T> {
        private final SupplierWithException<BucketWriter<T, String>, IOException> factory;
        private BucketWriter<T, String> bucketWriter;

        public Factory(SupplierWithException<BucketWriter<T, String>, IOException> supplierWithException) {
            this.factory = supplierWithException;
        }

        @Override // org.apache.flink.connector.file.table.stream.compact.CompactWriter.Factory
        public CompactWriter<T> create(CompactContext compactContext) throws IOException {
            if (this.bucketWriter == null) {
                this.bucketWriter = (BucketWriter) this.factory.get();
            }
            return new CompactBucketWriter(this.bucketWriter, this.bucketWriter.openNewInProgressFile(compactContext.getPartition(), compactContext.getPath(), 0L));
        }
    }

    private CompactBucketWriter(BucketWriter<T, String> bucketWriter, InProgressFileWriter<T, String> inProgressFileWriter) {
        this.bucketWriter = bucketWriter;
        this.writer = inProgressFileWriter;
    }

    @Override // org.apache.flink.connector.file.table.stream.compact.CompactWriter
    public void write(T t) throws IOException {
        this.writer.write(t, 0L);
    }

    @Override // org.apache.flink.connector.file.table.stream.compact.CompactWriter
    public void commit() throws IOException {
        this.bucketWriter.recoverPendingFile(this.writer.closeForCommit()).commit();
    }

    public static <T> CompactWriter.Factory<T> factory(SupplierWithException<BucketWriter<T, String>, IOException> supplierWithException) {
        return new Factory(supplierWithException);
    }
}
