package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRocksDbConfigSetter;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.Filter;
import org.rocksdb.LRUCache;
import org.rocksdb.Options;
import org.rocksdb.PlainTableConfig;
import org.rocksdb.RocksDB;
import org.rocksdb.Statistics;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStoreTest.class */
public class RocksDBStoreTest extends AbstractKeyValueStoreTest {
    private static boolean enableBloomFilters = false;
    static final String DB_NAME = "db-name";
    static final String METRICS_SCOPE = "metrics-scope";
    private File dir;
    private final Time time = new MockTime();
    private final Serializer<String> stringSerializer = new StringSerializer();
    private final Deserializer<String> stringDeserializer = new StringDeserializer();

    @Mock
    private RocksDBMetricsRecorder metricsRecorder;
    InternalMockProcessorContext context;
    RocksDBStore rocksDBStore;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStoreTest$RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig.class */
    public static class RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig implements RocksDBConfigSetter {
        public void setConfig(String str, Options options, Map<String, Object> map) {
            options.setTableFormatConfig(new BlockBasedTableConfig());
        }

        public void close(String str, Options options) {
            options.statistics().close();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStoreTest$RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig.class */
    public static class RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig implements RocksDBConfigSetter {
        public void setConfig(String str, Options options, Map<String, Object> map) {
            options.setTableFormatConfig(new PlainTableConfig());
        }

        public void close(String str, Options options) {
            options.statistics().close();
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStoreTest$RocksDBConfigSetterWithUserProvidedStatistics.class */
    public static class RocksDBConfigSetterWithUserProvidedStatistics implements RocksDBConfigSetter {
        protected static Statistics lastStatistics = null;

        public void setConfig(String str, Options options, Map<String, Object> map) {
            lastStatistics = new Statistics();
            options.setStatistics(lastStatistics);
        }

        public void close(String str, Options options) {
            Assertions.assertTrue(lastStatistics.isOwningHandle());
            lastStatistics.close();
            lastStatistics = null;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBStoreTest$TestingBloomFilterRocksDBConfigSetter.class */
    public static class TestingBloomFilterRocksDBConfigSetter implements RocksDBConfigSetter {
        static boolean bloomFiltersSet;
        static Filter filter;
        static Cache cache;

        public void setConfig(String str, Options options, Map<String, Object> map) {
            BlockBasedTableConfig tableFormatConfig = options.tableFormatConfig();
            cache = new LRUCache(52428800L);
            tableFormatConfig.setBlockCache(cache);
            tableFormatConfig.setBlockSize(4096L);
            if (RocksDBStoreTest.enableBloomFilters) {
                filter = new BloomFilter();
                tableFormatConfig.setFilterPolicy(filter);
                options.optimizeFiltersForHits();
                bloomFiltersSet = true;
            } else {
                options.setOptimizeFiltersForHits(false);
                bloomFiltersSet = false;
            }
            options.setTableFormatConfig(tableFormatConfig);
        }

        public void close(String str, Options options) {
            if (filter != null) {
                filter.close();
            }
            cache.close();
        }
    }

    @BeforeEach
    public void setUp() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        this.rocksDBStore = getRocksDBStore();
    }

    @AfterEach
    public void tearDown() {
        this.rocksDBStore.close();
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest
    protected <K, V> KeyValueStore<K, V> createKeyValueStore(StateStoreContext stateStoreContext) {
        KeyValueStore<K, V> build = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("my-store"), stateStoreContext.keySerde(), stateStoreContext.valueSerde()).build();
        build.init(stateStoreContext, build);
        return build;
    }

    RocksDBStore getRocksDBStore() {
        return new RocksDBStore(DB_NAME, METRICS_SCOPE);
    }

    private RocksDBStore getRocksDBStoreWithRocksDBMetricsRecorder() {
        return new RocksDBStore(DB_NAME, "rocksdb", this.metricsRecorder);
    }

    private RocksDBStore getRocksDBStoreWithCustomManagedIterators() {
        return new RocksDBStore(DB_NAME, "rocksdb", this.metricsRecorder, false);
    }

    private InternalMockProcessorContext getProcessorContext(Properties properties) {
        return new InternalMockProcessorContext(TestUtils.tempDirectory(), new StreamsConfig(properties));
    }

    private InternalMockProcessorContext getProcessorContext(Sensor.RecordingLevel recordingLevel, Class<? extends RocksDBConfigSetter> cls) {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.setProperty("metrics.recording.level", recordingLevel.name());
        streamsConfig.put("rocksdb.config.setter", cls);
        return getProcessorContext(streamsConfig);
    }

    private InternalMockProcessorContext getProcessorContext(Sensor.RecordingLevel recordingLevel) {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.setProperty("metrics.recording.level", recordingLevel.name());
        return getProcessorContext(streamsConfig);
    }

    @Test
    public void shouldAddValueProvidersWithoutStatisticsToInjectedMetricsRecorderWhenRecordingLevelInfo() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.INFO);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).addValueProviders((String) ArgumentMatchers.eq(DB_NAME), (RocksDB) ArgumentMatchers.notNull(), (Cache) ArgumentMatchers.notNull(), (Statistics) ArgumentMatchers.isNull());
    }

    @Test
    public void shouldAddValueProvidersWithStatisticsToInjectedMetricsRecorderWhenRecordingLevelDebug() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).addValueProviders((String) ArgumentMatchers.eq(DB_NAME), (RocksDB) ArgumentMatchers.notNull(), (Cache) ArgumentMatchers.notNull(), (Statistics) ArgumentMatchers.notNull());
    }

    @Test
    public void shouldRemoveValueProvidersFromInjectedMetricsRecorderOnClose() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        try {
            this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG);
            this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
            ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).removeValueProviders(DB_NAME);
        } finally {
            this.rocksDBStore.close();
        }
    }

    @Test
    public void shouldNotSetStatisticsInValueProvidersWhenUserProvidesStatistics() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).addValueProviders((String) ArgumentMatchers.eq(DB_NAME), (RocksDB) ArgumentMatchers.notNull(), (Cache) ArgumentMatchers.notNull(), (Statistics) ArgumentMatchers.isNull());
    }

    @Test
    public void shouldCloseStatisticsWhenUserProvidesStatistics() throws Exception {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedStatistics.class);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        Statistics statistics = RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics;
        Statistics statistics2 = getStatistics(this.rocksDBStore);
        this.rocksDBStore.close();
        Assertions.assertFalse(statistics.isOwningHandle());
        Assertions.assertFalse(statistics2.isOwningHandle());
        Assertions.assertNull(getStatistics(this.rocksDBStore));
        Assertions.assertNull(RocksDBConfigSetterWithUserProvidedStatistics.lastStatistics);
    }

    @Test
    public void shouldSetStatisticsInValueProvidersWhenUserProvidesNoStatistics() throws Exception {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).addValueProviders((String) ArgumentMatchers.eq(DB_NAME), (RocksDB) ArgumentMatchers.notNull(), (Cache) ArgumentMatchers.notNull(), (Statistics) ArgumentMatchers.eq(getStatistics(this.rocksDBStore)));
    }

    @Test
    public void shouldCloseStatisticsWhenUserProvidesNoStatistics() throws Exception {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        Statistics statistics = getStatistics(this.rocksDBStore);
        this.rocksDBStore.close();
        Assertions.assertFalse(statistics.isOwningHandle());
        Assertions.assertNull(getStatistics(this.rocksDBStore));
    }

    @Test
    public void shouldThrowWhenUserProvidesNewBlockBasedTableFormatConfig() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewBlockBasedTableFormatConfig.class);
        Assertions.assertThrows(ProcessorStateException.class, () -> {
            this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        }, "The used block-based table format configuration does not expose the block cache. Use the BlockBasedTableConfig instance provided by Options#tableFormatConfig() to configure the block-based table format of RocksDB. Do not provide a new instance of BlockBasedTableConfig to the RocksDB options.");
    }

    @Test
    public void shouldNotSetCacheInValueProvidersWhenUserProvidesPlainTableFormatConfig() {
        this.rocksDBStore = getRocksDBStoreWithRocksDBMetricsRecorder();
        this.context = getProcessorContext(Sensor.RecordingLevel.DEBUG, RocksDBConfigSetterWithUserProvidedNewPlainTableFormatConfig.class);
        this.rocksDBStore.openDB(this.context.appConfigs(), this.context.stateDir());
        ((RocksDBMetricsRecorder) Mockito.verify(this.metricsRecorder)).addValueProviders((String) ArgumentMatchers.eq(DB_NAME), (RocksDB) ArgumentMatchers.notNull(), (Cache) ArgumentMatchers.isNull(), (Statistics) ArgumentMatchers.notNull());
    }

    @Test
    public void shouldNotThrowExceptionOnRestoreWhenThereIsPreExistingRocksDbFiles() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.put(new Bytes("existingKey".getBytes(StandardCharsets.UTF_8)), "existingValue".getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.flush();
        ArrayList arrayList = new ArrayList();
        arrayList.add(KeyValue.pair("restoredKey".getBytes(StandardCharsets.UTF_8), "restoredValue".getBytes(StandardCharsets.UTF_8)));
        this.context.restore(DB_NAME, arrayList);
        MatcherAssert.assertThat((String) this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "restoredKey")))), CoreMatchers.equalTo("restoredValue"));
    }

    @Test
    public void shouldCallRocksDbConfigSetter() {
        MockRocksDbConfigSetter.called = false;
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        Object obj = new Object();
        streamsConfig.put("abc.def", obj);
        this.rocksDBStore.init(new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig)), this.rocksDBStore);
        Assertions.assertTrue(MockRocksDbConfigSetter.called);
        MatcherAssert.assertThat(MockRocksDbConfigSetter.configMap.get("abc.def"), CoreMatchers.equalTo(obj));
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnOpeningReadOnlyDir() {
        File tempDirectory = TestUtils.tempDirectory();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(tempDirectory, new StreamsConfig(StreamsTestUtils.getStreamsConfig()));
        Assertions.assertTrue(tempDirectory.setReadOnly());
        Assertions.assertThrows(ProcessorStateException.class, () -> {
            this.rocksDBStore.openDB(internalMockProcessorContext.appConfigs(), internalMockProcessorContext.stateDir());
        });
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractKeyValueStoreTest
    @Test
    public void shouldPutAll() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "1")), this.stringSerializer.serialize((String) null, "a")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "2")), this.stringSerializer.serialize((String) null, "b")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "3")), this.stringSerializer.serialize((String) null, "c")));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.putAll(arrayList);
        this.rocksDBStore.flush();
        Assertions.assertEquals("a", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
        Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize((String) null, "one")), this.stringSerializer.serialize((String) null, "A"));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize((String) null, "two")), this.stringSerializer.serialize((String) null, "B"));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize((String) null, "three")), this.stringSerializer.serialize((String) null, "C"));
        Assertions.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 3L)}))})), this.rocksDBStore.getPosition());
    }

    @Test
    public void shouldReturnKeysWithGivenPrefix() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "k1")), this.stringSerializer.serialize((String) null, "a")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "prefix_3")), this.stringSerializer.serialize((String) null, "b")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "k2")), this.stringSerializer.serialize((String) null, "c")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "prefix_2")), this.stringSerializer.serialize((String) null, "d")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "k3")), this.stringSerializer.serialize((String) null, "e")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "prefix_1")), this.stringSerializer.serialize((String) null, "f")));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.putAll(arrayList);
        this.rocksDBStore.flush();
        KeyValueIterator prefixScan = this.rocksDBStore.prefixScan("prefix", this.stringSerializer);
        try {
            ArrayList arrayList2 = new ArrayList();
            int i = 0;
            while (prefixScan.hasNext()) {
                arrayList2.add(new String((byte[]) ((KeyValue) prefixScan.next()).value));
                i++;
            }
            MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.is(3));
            MatcherAssert.assertThat((String) arrayList2.get(0), CoreMatchers.is("f"));
            MatcherAssert.assertThat((String) arrayList2.get(1), CoreMatchers.is("d"));
            MatcherAssert.assertThat((String) arrayList2.get(2), CoreMatchers.is("b"));
            if (prefixScan != null) {
                prefixScan.close();
            }
        } catch (Throwable th) {
            if (prefixScan != null) {
                try {
                    prefixScan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnKeysWithGivenPrefixExcludingNextKeyLargestKey() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "abc")), this.stringSerializer.serialize((String) null, "f")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "abcd")), this.stringSerializer.serialize((String) null, "f")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "abce")), this.stringSerializer.serialize((String) null, "f")));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.putAll(arrayList);
        this.rocksDBStore.flush();
        KeyValueIterator prefixScan = this.rocksDBStore.prefixScan("abcd", this.stringSerializer);
        int i = 0;
        while (prefixScan.hasNext()) {
            try {
                ((Bytes) ((KeyValue) prefixScan.next()).key).get();
                i++;
            } catch (Throwable th) {
                if (prefixScan != null) {
                    try {
                        prefixScan.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.is(1));
        if (prefixScan != null) {
            prefixScan.close();
        }
    }

    @Test
    public void shouldAllowCustomManagedIterators() {
        this.rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        HashSet hashSet = new HashSet();
        KeyValueIterator prefixScan = this.rocksDBStore.prefixScan("abcd", this.stringSerializer, hashSet);
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(1));
        prefixScan.close();
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(0));
        KeyValueIterator range = this.rocksDBStore.range((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")), hashSet);
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(1));
        range.close();
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(0));
        KeyValueIterator reverseRange = this.rocksDBStore.reverseRange((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")), hashSet);
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(1));
        reverseRange.close();
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(0));
        KeyValueIterator all = this.rocksDBStore.all(hashSet);
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(1));
        all.close();
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(0));
        KeyValueIterator reverseAll = this.rocksDBStore.reverseAll(hashSet);
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(1));
        reverseAll.close();
        MatcherAssert.assertThat(Integer.valueOf(hashSet.size()), CoreMatchers.is(0));
    }

    @Test
    public void shouldRequireOpenIteratorsWhenUsingCustomManagedIterators() {
        this.rocksDBStore = getRocksDBStoreWithCustomManagedIterators();
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.prefixScan("abcd", this.stringSerializer);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.range((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.reverseRange((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.all();
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.reverseAll();
        });
    }

    @Test
    public void shouldNotAllowOpenIteratorsWhenUsingAutoManagedIterators() {
        this.rocksDBStore = getRocksDBStore();
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        HashSet hashSet = new HashSet();
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.prefixScan("abcd", this.stringSerializer, hashSet);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.range((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")), hashSet);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.reverseRange((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")), hashSet);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.all(hashSet);
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.rocksDBStore.reverseAll(hashSet);
        });
    }

    @Test
    public void shouldReturnUUIDsWithStringPrefix() {
        ArrayList arrayList = new ArrayList();
        Serializer serializer = Serdes.UUID().serializer();
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        String substring = randomUUID.toString().substring(0, 4);
        int i = randomUUID2.toString().substring(0, 4).equals(substring) ? 2 : 1;
        arrayList.add(new KeyValue(new Bytes(serializer.serialize((String) null, randomUUID)), this.stringSerializer.serialize((String) null, "a")));
        arrayList.add(new KeyValue(new Bytes(serializer.serialize((String) null, randomUUID2)), this.stringSerializer.serialize((String) null, "b")));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.putAll(arrayList);
        this.rocksDBStore.flush();
        KeyValueIterator prefixScan = this.rocksDBStore.prefixScan(substring, this.stringSerializer);
        try {
            ArrayList arrayList2 = new ArrayList();
            int i2 = 0;
            while (prefixScan.hasNext()) {
                arrayList2.add(new String((byte[]) ((KeyValue) prefixScan.next()).value));
                i2++;
            }
            MatcherAssert.assertThat(Integer.valueOf(i2), CoreMatchers.is(Integer.valueOf(i)));
            if (i == 2) {
                MatcherAssert.assertThat((String) arrayList2.get(0), CoreMatchers.either(CoreMatchers.is("a")).or(CoreMatchers.is("b")));
            } else {
                MatcherAssert.assertThat((String) arrayList2.get(0), CoreMatchers.is("a"));
            }
            if (prefixScan != null) {
                prefixScan.close();
            }
        } catch (Throwable th) {
            if (prefixScan != null) {
                try {
                    prefixScan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldReturnNoKeys() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "a")), this.stringSerializer.serialize((String) null, "a")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "b")), this.stringSerializer.serialize((String) null, "c")));
        arrayList.add(new KeyValue(new Bytes(this.stringSerializer.serialize((String) null, "c")), this.stringSerializer.serialize((String) null, "e")));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.rocksDBStore.putAll(arrayList);
        this.rocksDBStore.flush();
        KeyValueIterator prefixScan = this.rocksDBStore.prefixScan("d", this.stringSerializer);
        int i = 0;
        while (prefixScan.hasNext()) {
            try {
                prefixScan.next();
                i++;
            } catch (Throwable th) {
                if (prefixScan != null) {
                    try {
                        prefixScan.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.is(0));
        if (prefixScan != null) {
            prefixScan.close();
        }
    }

    @Test
    public void shouldRestoreAll() {
        List<KeyValue<byte[], byte[]>> keyValueEntries = getKeyValueEntries();
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), keyValueEntries);
        Assertions.assertEquals("a", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
        Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
    }

    @Test
    public void shouldPutOnlyIfAbsentValue() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Bytes bytes = new Bytes(this.stringSerializer.serialize((String) null, "one"));
        byte[] serialize = this.stringSerializer.serialize((String) null, "A");
        byte[] serialize2 = this.stringSerializer.serialize((String) null, "B");
        this.rocksDBStore.putIfAbsent(bytes, serialize);
        this.rocksDBStore.putIfAbsent(bytes, serialize2);
        Assertions.assertEquals("A", (String) this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(bytes)));
    }

    @Test
    public void shouldHandleDeletesOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> keyValueEntries = getKeyValueEntries();
        keyValueEntries.add(new KeyValue<>("1".getBytes(StandardCharsets.UTF_8), (Object) null));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), keyValueEntries);
        KeyValueIterator all = this.rocksDBStore.all();
        try {
            HashSet hashSet = new HashSet();
            while (all.hasNext()) {
                hashSet.add((String) this.stringDeserializer.deserialize((String) null, ((Bytes) ((KeyValue) all.next()).key).get()));
            }
            MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(Utils.mkSet(new String[]{"2", "3"})));
            if (all != null) {
                all.close();
            }
        } catch (Throwable th) {
            if (all != null) {
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldHandleDeletesAndPutBackOnRestoreAll() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("1".getBytes(StandardCharsets.UTF_8), (Object) null));
        arrayList.add(new KeyValue("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("1".getBytes(StandardCharsets.UTF_8), "restored".getBytes(StandardCharsets.UTF_8)));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), arrayList);
        KeyValueIterator all = this.rocksDBStore.all();
        try {
            HashSet hashSet = new HashSet();
            while (all.hasNext()) {
                hashSet.add((String) this.stringDeserializer.deserialize((String) null, ((Bytes) ((KeyValue) all.next()).key).get()));
            }
            MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(Utils.mkSet(new String[]{"1", "2", "3"})));
            Assertions.assertEquals("restored", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
            Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
            Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
            if (all != null) {
                all.close();
            }
        } catch (Throwable th) {
            if (all != null) {
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldRestoreThenDeleteOnRestoreAll() {
        List<KeyValue<byte[], byte[]>> keyValueEntries = getKeyValueEntries();
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), keyValueEntries);
        Assertions.assertEquals("a", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
        Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
        keyValueEntries.clear();
        keyValueEntries.add(new KeyValue<>("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8)));
        keyValueEntries.add(new KeyValue<>("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8)));
        keyValueEntries.add(new KeyValue<>("1".getBytes(StandardCharsets.UTF_8), (Object) null));
        this.context.restore(this.rocksDBStore.name(), keyValueEntries);
        KeyValueIterator all = this.rocksDBStore.all();
        try {
            HashSet hashSet = new HashSet();
            while (all.hasNext()) {
                hashSet.add((String) this.stringDeserializer.deserialize((String) null, ((Bytes) ((KeyValue) all.next()).key).get()));
            }
            MatcherAssert.assertThat(hashSet, CoreMatchers.equalTo(Utils.mkSet(new String[]{"2", "3"})));
            if (all != null) {
                all.close();
            }
        } catch (Throwable th) {
            if (all != null) {
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPut() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.rocksDBStore.put((Bytes) null, this.stringSerializer.serialize((String) null, "someVal"));
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullPutAll() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.rocksDBStore.put((Bytes) null, this.stringSerializer.serialize((String) null, "someVal"));
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnNullGet() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.rocksDBStore.get((Bytes) null);
        });
    }

    @Test
    public void shouldThrowNullPointerExceptionOnDelete() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.rocksDBStore.delete((Bytes) null);
        });
    }

    @Test
    public void shouldReturnValueOnRange() {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        KeyValue keyValue = new KeyValue("0", "zero");
        KeyValue keyValue2 = new KeyValue("1", "one");
        KeyValue keyValue3 = new KeyValue("2", "two");
        this.rocksDBStore.put(new Bytes(((String) keyValue.key).getBytes(StandardCharsets.UTF_8)), ((String) keyValue.value).getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.put(new Bytes(((String) keyValue2.key).getBytes(StandardCharsets.UTF_8)), ((String) keyValue2.value).getBytes(StandardCharsets.UTF_8));
        this.rocksDBStore.put(new Bytes(((String) keyValue3.key).getBytes(StandardCharsets.UTF_8)), ((String) keyValue3.value).getBytes(StandardCharsets.UTF_8));
        LinkedList linkedList = new LinkedList();
        linkedList.add(keyValue);
        linkedList.add(keyValue2);
        KeyValueIterator<Bytes, byte[]> range = this.rocksDBStore.range((Bytes) null, new Bytes(this.stringSerializer.serialize((String) null, "1")));
        try {
            Assertions.assertEquals(linkedList, getDeserializedList(range));
            if (range != null) {
                range.close();
            }
        } catch (Throwable th) {
            if (range != null) {
                try {
                    range.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldThrowProcessorStateExceptionOnPutDeletedDir() throws IOException {
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Utils.delete(this.dir);
        this.rocksDBStore.put(new Bytes(this.stringSerializer.serialize((String) null, "anyKey")), this.stringSerializer.serialize((String) null, "anyValue"));
        Assertions.assertThrows(ProcessorStateException.class, () -> {
            this.rocksDBStore.flush();
        });
    }

    @Test
    public void shouldHandleToggleOfEnablingBloomFilters() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", TestingBloomFilterRocksDBConfigSetter.class);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        enableBloomFilters = false;
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        ArrayList arrayList = new ArrayList();
        arrayList.add("a");
        arrayList.add("b");
        arrayList.add("c");
        List<KeyValue<byte[], byte[]>> keyValueEntries = getKeyValueEntries();
        for (KeyValue<byte[], byte[]> keyValue : keyValueEntries) {
            this.rocksDBStore.put(new Bytes((byte[]) keyValue.key), (byte[]) keyValue.value);
        }
        int i = 0;
        Iterator<KeyValue<byte[], byte[]>> it = keyValueEntries.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            MatcherAssert.assertThat(new String(this.rocksDBStore.get(new Bytes((byte[]) it.next().key)), StandardCharsets.UTF_8), CoreMatchers.is((String) arrayList.get(i2)));
        }
        Assertions.assertFalse(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
        this.rocksDBStore.close();
        int i3 = 0;
        enableBloomFilters = true;
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        Iterator<KeyValue<byte[], byte[]>> it2 = keyValueEntries.iterator();
        while (it2.hasNext()) {
            int i4 = i3;
            i3++;
            MatcherAssert.assertThat(new String(this.rocksDBStore.get(new Bytes((byte[]) it2.next().key)), StandardCharsets.UTF_8), CoreMatchers.is((String) arrayList.get(i4)));
        }
        Assertions.assertTrue(TestingBloomFilterRocksDBConfigSetter.bloomFiltersSet);
    }

    @Test
    public void shouldVerifyThatMetricsRecordedFromStatisticsGetMeasurementsFromRocksDB() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        this.context = (InternalMockProcessorContext) Mockito.mock(InternalMockProcessorContext.class);
        Mockito.when(this.context.metrics()).thenReturn(streamsMetricsImpl);
        Mockito.when(this.context.taskId()).thenReturn(taskId);
        Mockito.when(this.context.appConfigs()).thenReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
        Mockito.when(this.context.stateDir()).thenReturn(this.dir);
        Mockito.when(this.context.recordMetadata()).thenReturn(Optional.of(new MonotonicProcessorRecordContext("test", 0)));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        byte[] bytes = "hello".getBytes();
        this.rocksDBStore.put(Bytes.wrap(bytes), "world".getBytes());
        streamsMetricsImpl.rocksDBMetricsRecordingTrigger().run();
        MatcherAssert.assertThat(Double.valueOf(((Double) metrics.metric(new MetricName("bytes-written-total", "stream-state-metrics", "description is not verified", streamsMetricsImpl.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME))).metricValue()).doubleValue()), Matchers.greaterThan(Double.valueOf(0.0d)));
    }

    @Test
    public void shouldVerifyThatMetricsRecordedFromPropertiesGetMeasurementsFromRocksDB() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        this.context = (InternalMockProcessorContext) Mockito.mock(InternalMockProcessorContext.class);
        Mockito.when(this.context.metrics()).thenReturn(streamsMetricsImpl);
        Mockito.when(this.context.taskId()).thenReturn(taskId);
        Mockito.when(this.context.appConfigs()).thenReturn(new StreamsConfig(StreamsTestUtils.getStreamsConfig()).originals());
        Mockito.when(this.context.stateDir()).thenReturn(this.dir);
        Mockito.when(this.context.recordMetadata()).thenReturn(Optional.of(new MonotonicProcessorRecordContext("test", 0)));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        byte[] bytes = "hello".getBytes();
        this.rocksDBStore.put(Bytes.wrap(bytes), "world".getBytes());
        KafkaMetric metric = metrics.metric(new MetricName("num-entries-active-mem-table", "stream-state-metrics", "description is not verified", streamsMetricsImpl.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)));
        MatcherAssert.assertThat(metric, CoreMatchers.notNullValue());
        MatcherAssert.assertThat((BigInteger) metric.metricValue(), Matchers.greaterThan(BigInteger.valueOf(0L)));
    }

    @Test
    public void shouldVerifyThatPropertyBasedMetricsUseValidPropertyName() {
        TaskId taskId = new TaskId(0, 0);
        Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO));
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, "test-application", "latest", this.time);
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        this.context = (InternalMockProcessorContext) Mockito.mock(InternalMockProcessorContext.class);
        Mockito.when(this.context.metrics()).thenReturn(streamsMetricsImpl);
        Mockito.when(this.context.taskId()).thenReturn(taskId);
        Mockito.when(this.context.appConfigs()).thenReturn(new StreamsConfig(streamsConfig).originals());
        Mockito.when(this.context.stateDir()).thenReturn(this.dir);
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        for (String str : Arrays.asList("num-entries-active-mem-table", "num-deletes-active-mem-table", "num-entries-imm-mem-tables", "num-deletes-imm-mem-tables", "num-immutable-mem-table", "cur-size-active-mem-table", "cur-size-all-mem-tables", "size-all-mem-tables", "mem-table-flush-pending", "num-running-flushes", "compaction-pending", "num-running-compactions", "estimate-pending-compaction-bytes", "total-sst-files-size", "live-sst-files-size", "num-live-versions", "block-cache-capacity", "block-cache-usage", "block-cache-pinned-usage", "estimate-num-keys", "estimate-table-readers-mem", "background-errors")) {
            KafkaMetric metric = metrics.metric(new MetricName(str, "stream-state-metrics", "description is not verified", streamsMetricsImpl.storeLevelTagMap(taskId.toString(), METRICS_SCOPE, DB_NAME)));
            MatcherAssert.assertThat("Metric " + str + " not found!", metric, CoreMatchers.notNullValue());
            metric.metricValue();
        }
    }

    @Test
    public void shouldPerformRangeQueriesWithCachingDisabled() {
        this.context.setTime(1L);
        this.store.put(1, "hi");
        this.store.put(2, "goodbye");
        KeyValueIterator range = this.store.range(1, 2);
        try {
            Assertions.assertEquals("hi", ((KeyValue) range.next()).value);
            Assertions.assertEquals("goodbye", ((KeyValue) range.next()).value);
            Assertions.assertFalse(range.hasNext());
            if (range != null) {
                range.close();
            }
        } catch (Throwable th) {
            if (range != null) {
                try {
                    range.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldPerformAllQueriesWithCachingDisabled() {
        this.context.setTime(1L);
        this.store.put(1, "hi");
        this.store.put(2, "goodbye");
        KeyValueIterator all = this.store.all();
        try {
            Assertions.assertEquals("hi", ((KeyValue) all.next()).value);
            Assertions.assertEquals("goodbye", ((KeyValue) all.next()).value);
            Assertions.assertFalse(all.hasNext());
            if (all != null) {
                all.close();
            }
        } catch (Throwable th) {
            if (all != null) {
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldCloseOpenRangeIteratorsWhenStoreClosedAndThrowInvalidStateStoreOnHasNextAndNext() {
        this.context.setTime(1L);
        this.store.put(1, "hi");
        this.store.put(2, "goodbye");
        KeyValueIterator range = this.store.range(1, 5);
        try {
            KeyValueIterator range2 = this.store.range(1, 4);
            try {
                Assertions.assertTrue(range.hasNext());
                Assertions.assertTrue(range2.hasNext());
                this.store.close();
                Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                    range.hasNext();
                });
                Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                    range.next();
                });
                Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                    range2.hasNext();
                });
                Assertions.assertThrows(InvalidStateStoreException.class, () -> {
                    range2.next();
                });
                if (range2 != null) {
                    range2.close();
                }
                if (range != null) {
                    range.close();
                }
            } catch (Throwable th) {
                if (range2 != null) {
                    try {
                        range2.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (Throwable th3) {
            if (range != null) {
                try {
                    range.close();
                } catch (Throwable th4) {
                    th3.addSuppressed(th4);
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() {
        List<ConsumerRecord<byte[], byte[]>> changelogRecords = getChangelogRecords();
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), changelogRecords);
        Assertions.assertEquals("a", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
        Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
        MatcherAssert.assertThat(this.rocksDBStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions(""), Matchers.hasEntry(0, 3L));
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() {
        List<ConsumerRecord<byte[], byte[]>> changelogRecordsMultipleTopics = getChangelogRecordsMultipleTopics();
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), changelogRecordsMultipleTopics);
        Assertions.assertEquals("a", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        Assertions.assertEquals("b", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "2")))));
        Assertions.assertEquals("c", this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "3")))));
        MatcherAssert.assertThat(this.rocksDBStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 3L));
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions("B"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldHandleTombstoneRecords() {
        List<ConsumerRecord<byte[], byte[]>> changelogRecordsWithTombstones = getChangelogRecordsWithTombstones();
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restoreWithHeaders(this.rocksDBStore.name(), changelogRecordsWithTombstones);
        Assertions.assertNull(this.stringDeserializer.deserialize((String) null, this.rocksDBStore.get(new Bytes(this.stringSerializer.serialize((String) null, "1")))));
        MatcherAssert.assertThat(this.rocksDBStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.rocksDBStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldNotThrowWhenRestoringOnMissingHeaders() {
        List<KeyValue<byte[], byte[]>> changelogRecordsWithoutHeaders = getChangelogRecordsWithoutHeaders();
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", MockRocksDbConfigSetter.class);
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.dir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.dir, Serdes.String(), Serdes.String(), new StreamsConfig(streamsConfig));
        this.rocksDBStore.init(this.context, this.rocksDBStore);
        this.context.restore(this.rocksDBStore.name(), changelogRecordsWithoutHeaders);
        MatcherAssert.assertThat(this.rocksDBStore.getPosition(), CoreMatchers.is(Position.emptyPosition()));
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "", 0, 1L));
        arrayList.add(createChangelogRecord("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8), "", 0, 2L));
        arrayList.add(createChangelogRecord("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8), "", 0, 3L));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsMultipleTopics() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "A", 0, 1L));
        arrayList.add(createChangelogRecord("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8), "B", 0, 2L));
        arrayList.add(createChangelogRecord("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8), "A", 0, 3L));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithTombstones() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8), "A", 0, 1L));
        arrayList.add(createChangelogRecord("1".getBytes(StandardCharsets.UTF_8), null, "A", 0, 2L));
        return arrayList;
    }

    private List<KeyValue<byte[], byte[]>> getChangelogRecordsWithoutHeaders() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8)));
        return arrayList;
    }

    private ConsumerRecord<byte[], byte[]> createChangelogRecord(byte[] bArr, byte[] bArr2, String str, int i, long j) {
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent(str, i, j);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        return new ConsumerRecord<>("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, bArr, bArr2, recordHeaders, Optional.empty());
    }

    private List<KeyValue<byte[], byte[]>> getKeyValueEntries() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue("1".getBytes(StandardCharsets.UTF_8), "a".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("2".getBytes(StandardCharsets.UTF_8), "b".getBytes(StandardCharsets.UTF_8)));
        arrayList.add(new KeyValue("3".getBytes(StandardCharsets.UTF_8), "c".getBytes(StandardCharsets.UTF_8)));
        return arrayList;
    }

    private List<KeyValue<String, String>> getDeserializedList(KeyValueIterator<Bytes, byte[]> keyValueIterator) {
        return (List) Utils.toList(keyValueIterator).stream().map(keyValue -> {
            return new KeyValue(((Bytes) keyValue.key).toString(), (String) this.stringDeserializer.deserialize((String) null, (byte[]) keyValue.value));
        }).collect(Collectors.toList());
    }

    private Statistics getStatistics(RocksDBStore rocksDBStore) throws Exception {
        Field declaredField = rocksDBStore.getClass().getDeclaredField("statistics");
        declaredField.setAccessible(true);
        Statistics statistics = (Statistics) declaredField.get(rocksDBStore);
        declaredField.setAccessible(false);
        return statistics;
    }
}
