package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeMap;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.internals.RocksDBVersionedStore;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.rocksdb.WriteBatchInterface;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer.class */
public class RocksDBVersionedStoreRestoreWriteBuffer {
    private static final Logger log = LoggerFactory.getLogger(RocksDBVersionedStoreRestoreWriteBuffer.class);
    private final RocksDBVersionedStore.RocksDBVersionedStoreClient dbClient;
    private final Map<Bytes, Optional<byte[]>> latestValueWriteBuffer = new HashMap();
    private final TreeMap<Long, WriteBufferSegmentWithDbFallback> segmentsWriteBuffer = new TreeMap<>((l, l2) -> {
        return Long.compare(l2.longValue(), l.longValue());
    });
    private final RocksDBVersionedStoreRestoreClient restoreClient = new RocksDBVersionedStoreRestoreClient();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer$RocksDBVersionedStoreRestoreClient.class */
    public class RocksDBVersionedStoreRestoreClient implements RocksDBVersionedStore.VersionedStoreClient<WriteBufferSegmentWithDbFallback> {
        private RocksDBVersionedStoreRestoreClient() {
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public byte[] getLatestValue(Bytes bytes) {
            Optional optional = (Optional) RocksDBVersionedStoreRestoreWriteBuffer.this.latestValueWriteBuffer.get(bytes);
            return optional != null ? (byte[]) optional.orElse(null) : RocksDBVersionedStoreRestoreWriteBuffer.this.dbClient.getLatestValue(bytes);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public void putLatestValue(Bytes bytes, byte[] bArr) {
            RocksDBVersionedStoreRestoreWriteBuffer.this.latestValueWriteBuffer.put(bytes, Optional.ofNullable(bArr));
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public void deleteLatestValue(Bytes bytes) {
            putLatestValue(bytes, null);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public WriteBufferSegmentWithDbFallback getOrCreateSegmentIfLive(long j, ProcessorContext processorContext, long j2) {
            if (RocksDBVersionedStoreRestoreWriteBuffer.this.segmentsWriteBuffer.containsKey(Long.valueOf(j))) {
                return (WriteBufferSegmentWithDbFallback) RocksDBVersionedStoreRestoreWriteBuffer.this.segmentsWriteBuffer.get(Long.valueOf(j));
            }
            LogicalKeyValueSegment orCreateSegmentIfLive = RocksDBVersionedStoreRestoreWriteBuffer.this.dbClient.getOrCreateSegmentIfLive(j, processorContext, j2);
            if (orCreateSegmentIfLive == null) {
                return null;
            }
            return new WriteBufferSegmentWithDbFallback(orCreateSegmentIfLive);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public List<WriteBufferSegmentWithDbFallback> getReverseSegments(long j) {
            ArrayList arrayList = new ArrayList(RocksDBVersionedStoreRestoreWriteBuffer.this.segmentsWriteBuffer.headMap(Long.valueOf(segmentIdForTimestamp(j)), true).values());
            List<LogicalKeyValueSegment> reverseSegments = RocksDBVersionedStoreRestoreWriteBuffer.this.dbClient.getReverseSegments(j);
            ArrayList arrayList2 = new ArrayList();
            int i = 0;
            int i2 = 0;
            while (i < reverseSegments.size() && i2 < arrayList.size()) {
                LogicalKeyValueSegment logicalKeyValueSegment = reverseSegments.get(i);
                WriteBufferSegmentWithDbFallback writeBufferSegmentWithDbFallback = (WriteBufferSegmentWithDbFallback) arrayList.get(i2);
                long id = logicalKeyValueSegment.id();
                long id2 = writeBufferSegmentWithDbFallback.id();
                if (id > id2) {
                    arrayList2.add(new WriteBufferSegmentWithDbFallback(logicalKeyValueSegment));
                    i++;
                } else if (id < id2) {
                    arrayList2.add(writeBufferSegmentWithDbFallback);
                    i2++;
                } else {
                    arrayList2.add(writeBufferSegmentWithDbFallback);
                    i++;
                    i2++;
                }
            }
            while (i < reverseSegments.size()) {
                arrayList2.add(new WriteBufferSegmentWithDbFallback(reverseSegments.get(i)));
                i++;
            }
            while (i2 < arrayList.size()) {
                arrayList2.add((WriteBufferSegmentWithDbFallback) arrayList.get(i2));
                i2++;
            }
            return arrayList2;
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreClient
        public long segmentIdForTimestamp(long j) {
            return RocksDBVersionedStoreRestoreWriteBuffer.this.dbClient.segmentIdForTimestamp(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBVersionedStoreRestoreWriteBuffer$WriteBufferSegmentWithDbFallback.class */
    public class WriteBufferSegmentWithDbFallback implements RocksDBVersionedStore.VersionedStoreSegment {
        private final long id;
        private final Map<Bytes, byte[]> data = new HashMap();
        private final LogicalKeyValueSegment dbSegment;

        WriteBufferSegmentWithDbFallback(LogicalKeyValueSegment logicalKeyValueSegment) {
            this.dbSegment = (LogicalKeyValueSegment) Objects.requireNonNull(logicalKeyValueSegment);
            this.id = logicalKeyValueSegment.id();
            RocksDBVersionedStoreRestoreWriteBuffer.this.segmentsWriteBuffer.put(Long.valueOf(this.id), this);
        }

        LogicalKeyValueSegment dbSegment() {
            return this.dbSegment;
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment
        public long id() {
            return this.id;
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment
        public void put(Bytes bytes, byte[] bArr) {
            this.data.put(bytes, bArr);
        }

        @Override // org.apache.kafka.streams.state.internals.RocksDBVersionedStore.VersionedStoreSegment
        public byte[] get(Bytes bytes) {
            byte[] bArr = this.data.get(bytes);
            return bArr != null ? bArr : this.dbSegment.get(bytes);
        }

        Map<Bytes, byte[]> getAll() {
            return Collections.unmodifiableMap(this.data);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBVersionedStoreRestoreWriteBuffer(RocksDBVersionedStore.RocksDBVersionedStoreClient rocksDBVersionedStoreClient) {
        this.dbClient = (RocksDBVersionedStore.RocksDBVersionedStoreClient) Objects.requireNonNull(rocksDBVersionedStoreClient);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RocksDBVersionedStore.VersionedStoreClient<?> getClient() {
        return this.restoreClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() throws RocksDBException {
        try {
            WriteBatchInterface writeBatch = new WriteBatch();
            try {
                List<WriteBufferSegmentWithDbFallback> reverseSegments = this.restoreClient.getReverseSegments(Long.MIN_VALUE);
                if (reverseSegments.size() > 0) {
                    for (WriteBufferSegmentWithDbFallback writeBufferSegmentWithDbFallback : reverseSegments) {
                        LogicalKeyValueSegment dbSegment = writeBufferSegmentWithDbFallback.dbSegment();
                        for (Map.Entry<Bytes, byte[]> entry : writeBufferSegmentWithDbFallback.getAll().entrySet()) {
                            dbSegment.addToBatch(new KeyValue<>(entry.getKey().get(), entry.getValue()), writeBatch);
                        }
                    }
                    reverseSegments.get(0).dbSegment().write(writeBatch);
                }
                writeBatch.close();
                this.segmentsWriteBuffer.clear();
                try {
                    writeBatch = new WriteBatch();
                    try {
                        for (Map.Entry<Bytes, Optional<byte[]>> entry2 : this.latestValueWriteBuffer.entrySet()) {
                            this.dbClient.addToLatestValueBatch(new KeyValue<>(entry2.getKey().get(), entry2.getValue().orElse(null)), writeBatch);
                        }
                        this.dbClient.writeLatestValues(writeBatch);
                        writeBatch.close();
                        this.latestValueWriteBuffer.clear();
                    } finally {
                        try {
                            writeBatch.close();
                        } catch (Throwable th) {
                            th.addSuppressed(th);
                        }
                    }
                } catch (RocksDBException e) {
                    log.error("Error restoring batch to RocksDBVersionedStore latest value store.");
                    throw e;
                }
            } finally {
            }
        } catch (RocksDBException e2) {
            log.error("Error restoring batch to RocksDBVersionedStore segments store.");
            throw e2;
        }
    }
}
