package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.class */
public class ProcessorTopologyTest {
    private static final String DEFAULT_STORE_NAME = "prefixScanStore";
    private static final String INPUT_TOPIC_1 = "input-topic-1";
    private static final String INPUT_TOPIC_2 = "input-topic-2";
    private static final String OUTPUT_TOPIC_1 = "output-topic-1";
    private static final String OUTPUT_TOPIC_2 = "output-topic-2";
    private static final String THROUGH_TOPIC_1 = "through-topic-1";
    private TopologyTestDriver driver;
    private static final Serializer<String> STRING_SERIALIZER = new StringSerializer();
    private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer();
    private static final String DEFAULT_PREFIX = "key";
    private static final Header HEADER = new RecordHeader(DEFAULT_PREFIX, "value".getBytes());
    private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER});
    private final TopologyWrapper topology = new TopologyWrapper();
    private final MockApiProcessorSupplier<?, ?, ?, ?> mockProcessorSupplier = new MockApiProcessorSupplier<>();
    private final Properties props = new Properties();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$AddHeaderProcessor.class */
    public static class AddHeaderProcessor implements Processor<String, String, String, String> {
        private ProcessorContext<String, String> context;

        protected AddHeaderProcessor() {
        }

        public void init(ProcessorContext<String, String> processorContext) {
            this.context = processorContext;
        }

        public void process(Record<String, String> record) {
            Record withHeaders = record.withHeaders(record.headers());
            withHeaders.headers().add(ProcessorTopologyTest.HEADER);
            this.context.forward(withHeaders);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$CustomTimestampExtractor.class */
    public static class CustomTimestampExtractor implements TimestampExtractor {
        private static final long DEFAULT_TIMESTAMP = 1000;

        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            return consumerRecord.value().toString().matches(".*@[0-9]+") ? Long.parseLong(consumerRecord.value().toString().split("@")[1]) : consumerRecord.timestamp() >= 0 ? consumerRecord.timestamp() : DEFAULT_TIMESTAMP;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$DroppingPartitioner.class */
    public static class DroppingPartitioner implements StreamPartitioner<String, String> {
        DroppingPartitioner() {
        }

        @Deprecated
        public Integer partition(String str, String str2, String str3, int i) {
            return null;
        }

        public Optional<Set<Integer>> partitions(String str, String str2, String str3, int i) {
            HashSet hashSet = new HashSet();
            for (int i2 = 1; i2 < i; i2 += 2) {
                hashSet.add(Integer.valueOf(i2));
            }
            return Optional.of(hashSet);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$FanOutTimestampProcessor.class */
    public static class FanOutTimestampProcessor implements Processor<String, String, String, String> {
        private final String firstChild;
        private final String secondChild;
        private ProcessorContext<String, String> context;

        FanOutTimestampProcessor(String str, String str2) {
            this.firstChild = str;
            this.secondChild = str2;
        }

        public void init(ProcessorContext<String, String> processorContext) {
            this.context = processorContext;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record);
            this.context.forward(record.withTimestamp(record.timestamp() + 5), this.firstChild);
            this.context.forward(record, this.secondChild);
            this.context.forward(record.withTimestamp(record.timestamp() + 2));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$ForwardingProcessor.class */
    public static class ForwardingProcessor implements Processor<String, String, String, String> {
        private ProcessorContext<String, String> context;

        protected ForwardingProcessor() {
        }

        public void init(ProcessorContext<String, String> processorContext) {
            this.context = processorContext;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$OldAPIStatefulProcessor.class */
    public static class OldAPIStatefulProcessor extends AbstractProcessor<String, String> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        OldAPIStatefulProcessor(String str) {
            this.storeName = str;
        }

        public void init(org.apache.kafka.streams.processor.ProcessorContext processorContext) {
            super.init(processorContext);
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(String str, String str2) {
            this.store.put(str, str2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$StatefulProcessor.class */
    public static class StatefulProcessor implements Processor<String, String, Void, Void> {
        private KeyValueStore<String, String> store;
        private final String storeName;

        StatefulProcessor(String str) {
            this.storeName = str;
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(this.storeName);
        }

        public void process(Record<String, String> record) {
            this.store.put((String) record.key(), (String) record.value());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$TimestampProcessor.class */
    public static class TimestampProcessor implements Processor<String, String, String, String> {
        private ProcessorContext<String, String> context;

        protected TimestampProcessor() {
        }

        public void init(ProcessorContext<String, String> processorContext) {
            this.context = processorContext;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record.withTimestamp(record.timestamp() + 10));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/ProcessorTopologyTest$ValueTimestampProcessor.class */
    public static class ValueTimestampProcessor implements Processor<String, String, String, String> {
        private ProcessorContext<String, String> context;

        protected ValueTimestampProcessor() {
        }

        public void init(ProcessorContext<String, String> processorContext) {
            this.context = processorContext;
        }

        public void process(Record<String, String> record) {
            this.context.forward(record.withValue(((String) record.value()).split("@")[0]));
        }
    }

    @BeforeEach
    public void setup() {
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        this.props.setProperty("default.key.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.value.serde", Serdes.String().getClass().getName());
        this.props.setProperty("default.timestamp.extractor", CustomTimestampExtractor.class.getName());
    }

    @AfterEach
    public void cleanup() {
        this.props.clear();
        if (this.driver != null) {
            this.driver.close();
        }
        this.driver = null;
    }

    private List<KeyValue<String, String>> prefixScanResults(KeyValueStore<String, String> keyValueStore, String str) {
        ArrayList arrayList = new ArrayList();
        KeyValueIterator prefixScan = keyValueStore.prefixScan(str, Serdes.String().serializer());
        while (prefixScan.hasNext()) {
            try {
                arrayList.add((KeyValue) prefixScan.next());
            } catch (Throwable th) {
                if (prefixScan != null) {
                    try {
                        prefixScan.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        if (prefixScan != null) {
            prefixScan.close();
        }
        return arrayList;
    }

    @Test
    public void testTopologyMetadata() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.topology.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.topology.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.topology.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        this.topology.addSink("sink-2", "topic-4", new String[]{"processor-1", "processor-2"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        Assertions.assertEquals(6, buildTopology.processors().size());
        Assertions.assertEquals(2, buildTopology.sources().size());
        Assertions.assertEquals(3, buildTopology.sourceTopics().size());
        Assertions.assertNotNull(buildTopology.source("topic-1"));
        Assertions.assertNotNull(buildTopology.source("topic-2"));
        Assertions.assertNotNull(buildTopology.source("topic-3"));
        Assertions.assertEquals(buildTopology.source("topic-2"), buildTopology.source("topic-3"));
    }

    @Test
    public void shouldGetTerminalNodes() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2", "topic-3"});
        this.topology.addProcessor("processor-1", new MockApiProcessorSupplier(), new String[]{"source-1"});
        this.topology.addProcessor("processor-2", new MockApiProcessorSupplier(), new String[]{"source-1", "source-2"});
        this.topology.addSink("sink-1", "topic-3", new String[]{"processor-1"});
        MatcherAssert.assertThat(this.topology.getInternalBuilder("X").buildTopology().terminalNodes(), CoreMatchers.equalTo(Utils.mkSet(new String[]{"processor-2", "sink-1"})));
    }

    @Test
    public void shouldUpdateSourceTopicsWithNewMatchingTopic() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat(buildTopology.source("topic-2"), CoreMatchers.is(CoreMatchers.nullValue()));
        buildTopology.updateSourceTopics(Collections.singletonMap("source-1", Arrays.asList("topic-1", "topic-2")));
        MatcherAssert.assertThat(buildTopology.source("topic-2").name(), CoreMatchers.equalTo("source-1"));
    }

    @Test
    public void shouldUpdateSourceTopicsWithRemovedTopic() {
        this.topology.addSource("source-1", new String[]{"topic-1", "topic-2"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat(buildTopology.source("topic-2").name(), CoreMatchers.equalTo("source-1"));
        buildTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
        MatcherAssert.assertThat(buildTopology.source("topic-2"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldUpdateSourceTopicsWithAllTopicsRemoved() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat(buildTopology.source("topic-1").name(), CoreMatchers.equalTo("source-1"));
        buildTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.emptyList()));
        MatcherAssert.assertThat(buildTopology.source("topic-1"), CoreMatchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldUpdateSourceTopicsOnlyForSourceNodesWithinTheSubtopology() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        buildTopology.updateSourceTopics(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("source-1", Collections.singletonList("topic-1")), Utils.mkEntry("source-2", Collections.singletonList("topic-2"))}));
        MatcherAssert.assertThat(buildTopology.source("topic-2"), CoreMatchers.is(CoreMatchers.nullValue()));
        MatcherAssert.assertThat(Integer.valueOf(buildTopology.sources().size()), CoreMatchers.equalTo(1));
    }

    @Test
    public void shouldThrowIfSourceNodeToUpdateDoesNotExist() {
        this.topology.addSource("source-2", new String[]{"topic-2"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat(Assertions.assertThrows(IllegalStateException.class, () -> {
            buildTopology.updateSourceTopics(Collections.singletonMap("source-1", Collections.singletonList("topic-1")));
        }).getMessage(), CoreMatchers.is("Node source-2 not found in full topology"));
    }

    @Test
    public void shouldThrowIfMultipleSourceNodeOfSameSubtopologySubscribedToSameTopic() {
        this.topology.addSource("source-1", new String[]{"topic-1"});
        this.topology.addSource("source-2", new String[]{"topic-2"});
        ProcessorTopology buildTopology = this.topology.getInternalBuilder("X").buildTopology();
        MatcherAssert.assertThat(Assertions.assertThrows(IllegalStateException.class, () -> {
            buildTopology.updateSourceTopics(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("source-1", Collections.singletonList("topic-1")), Utils.mkEntry("source-2", Arrays.asList("topic-2", "topic-1"))}));
        }).getMessage(), CoreMatchers.startsWith("Topic topic-1 was already registered to source node"));
    }

    @Test
    public void testDrivingSimpleTopology() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        createInputTopic.pipeInput("key2", "value2");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key4", "value4");
        createInputTopic.pipeInput("key5", "value5");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key4", "value4");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key5", "value5");
        Assertions.assertTrue(createOutputTopic.isEmpty());
    }

    @Test
    public void testDrivingSimpleTopologyWithDroppingPartitioner() {
        this.driver = new TopologyTestDriver(createSimpleTopologyWithDroppingPartitioner(), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        Assertions.assertTrue(createOutputTopic.isEmpty());
    }

    @Test
    public void testDrivingStatefulTopology() {
        this.driver = new TopologyTestDriver(createStatefulTopology("entries"), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("entries");
        Assertions.assertEquals("value4", keyValueStore.get("key1"));
        Assertions.assertEquals("value2", keyValueStore.get("key2"));
        Assertions.assertEquals("value3", keyValueStore.get("key3"));
        Assertions.assertNull(keyValueStore.get("key4"));
    }

    @Test
    public void testDrivingConnectedStateStoreTopology() {
        this.driver = new TopologyTestDriver(createConnectedStateStoreTopology("connectedStore"), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("connectedStore");
        Assertions.assertEquals("value4", keyValueStore.get("key1"));
        Assertions.assertEquals("value2", keyValueStore.get("key2"));
        Assertions.assertEquals("value3", keyValueStore.get("key3"));
        Assertions.assertNull(keyValueStore.get("key4"));
    }

    @Test
    @Deprecated
    public void testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("connectedStore"), Serdes.String(), Serdes.String());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor("connectedStore");
        }, Collections.singleton(keyValueStoreBuilder)), new String[]{"source1"}).addProcessor("processor2", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor("connectedStore");
        }, Collections.singleton(keyValueStoreBuilder)), new String[]{"source2"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1", "processor2"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("connectedStore");
        Assertions.assertEquals("value4", keyValueStore.get("key1"));
        Assertions.assertEquals("value2", keyValueStore.get("key2"));
        Assertions.assertEquals("value3", keyValueStore.get("key3"));
        Assertions.assertNull(keyValueStore.get("key4"));
    }

    @Test
    public void testDrivingConnectedStateStoreInDifferentProcessorsTopology() {
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("connectedStore"), Serdes.String(), Serdes.String());
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addSource("source2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor("connectedStore");
        }, Collections.singleton(keyValueStoreBuilder)), new String[]{"source1"}).addProcessor("processor2", defineWithStores(() -> {
            return new StatefulProcessor("connectedStore");
        }, Collections.singleton(keyValueStoreBuilder)), new String[]{"source2"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1", "processor2"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("connectedStore");
        Assertions.assertEquals("value4", keyValueStore.get("key1"));
        Assertions.assertEquals("value2", keyValueStore.get("key2"));
        Assertions.assertEquals("value3", keyValueStore.get("key3"));
        Assertions.assertNull(keyValueStore.get("key4"));
    }

    @Test
    public void testPrefixScanInMemoryStoreNoCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanInMemoryStoreWithCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanInMemoryStoreWithCachingWithLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreNoCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreWithCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentStoreWithCachingWithLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanPersistentTimestampedStoreWithCachingWithLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanLruMapNoCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanLruMapWithCachingNoLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    public void testPrefixScanLruMapWithCachingWithLogging() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStores(() -> {
            return new StatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanInMemoryStoreNoCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanInMemoryStoreWithCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanInMemoryStoreWithCachingWithLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentStoreNoCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentStoreWithCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentStoreWithCachingWithLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentTimestampedStoreNoCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentTimestampedStoreWithCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanPersistentTimestampedStoreWithCachingWithLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(DEFAULT_STORE_NAME), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanLruMapNoCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingDisabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanLruMapWithCachingNoLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingDisabled())), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void testPrefixScanLruMapWithCachingWithLoggingOldProcessor() {
        this.topology.addSource("source1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor1", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(DEFAULT_STORE_NAME);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.lruMap(DEFAULT_STORE_NAME, 100), Serdes.String(), Serdes.String()).withCachingEnabled().withLoggingEnabled(Collections.emptyMap()))), new String[]{"source1"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor1"});
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        createInputTopic.pipeInput("key1", "value4");
        Assertions.assertTrue(createOutputTopic.isEmpty());
        List<KeyValue<String, String>> prefixScanResults = prefixScanResults(this.driver.getKeyValueStore(DEFAULT_STORE_NAME), DEFAULT_PREFIX);
        Assertions.assertEquals("key1", prefixScanResults.get(0).key);
        Assertions.assertEquals("value4", prefixScanResults.get(0).value);
        Assertions.assertEquals("key2", prefixScanResults.get(1).key);
        Assertions.assertEquals("value2", prefixScanResults.get(1).value);
        Assertions.assertEquals("key3", prefixScanResults.get(2).key);
        Assertions.assertEquals("value3", prefixScanResults.get(2).value);
    }

    @Test
    @Deprecated
    public void shouldDriveGlobalStore() {
        this.topology.addGlobalStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("my-store"), Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", STRING_DESERIALIZER, STRING_DESERIALIZER, AssignmentTestUtils.TOPIC_PREFIX, "processor", define(new OldAPIStatefulProcessor("my-store")));
        this.driver = new TopologyTestDriver(this.topology, this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, STRING_SERIALIZER, STRING_SERIALIZER);
        KeyValueStore keyValueStore = this.driver.getKeyValueStore("my-store");
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        Assertions.assertEquals("value1", keyValueStore.get("key1"));
        Assertions.assertEquals("value2", keyValueStore.get("key2"));
    }

    @Test
    public void testDrivingSimpleMultiSourceTopology() {
        this.driver = new TopologyTestDriver(createSimpleMultiSourceTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        TestOutputTopic createOutputTopic2 = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1");
        Assertions.assertTrue(createOutputTopic2.isEmpty());
        this.driver.createInputTopic(INPUT_TOPIC_2, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO).pipeInput("key2", "value2");
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key2", "value2");
        Assertions.assertTrue(createOutputTopic2.isEmpty());
    }

    @Test
    public void testDrivingForwardToSourceTopology() {
        this.driver = new TopologyTestDriver(createForwardToSourceTopology(), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningTopology() {
        this.driver = new TopologyTestDriver(createInternalRepartitioningTopology(), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO);
        createInputTopic.pipeInput("key1", "value1");
        createInputTopic.pipeInput("key2", "value2");
        createInputTopic.pipeInput("key3", "value3");
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2");
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3");
    }

    @Test
    public void testDrivingInternalRepartitioningForwardingTimestampTopology() {
        this.driver = new TopologyTestDriver(createInternalRepartitioningWithValueTimestampTopology(), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        createInputTopic.pipeInput("key1", "value1@1000");
        createInputTopic.pipeInput("key2", "value2@2000");
        createInputTopic.pipeInput("key3", "value3@3000");
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("key1", "value1", (Headers) null, 1000L)));
        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("key2", "value2", (Headers) null, 2000L)));
        MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord("key3", "value3", (Headers) null, 3000L)));
    }

    @Test
    public void shouldCreateStringWithSourceAndTopics() {
        this.topology.addSource("source", new String[]{AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME});
        MatcherAssert.assertThat(this.topology.getInternalBuilder().buildTopology().toString(), CoreMatchers.containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
    }

    @Test
    public void shouldCreateStringWithMultipleSourcesAndTopics() {
        this.topology.addSource("source", new String[]{AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME});
        this.topology.addSource("source2", new String[]{"t", "t1", "t2"});
        String processorTopology = this.topology.getInternalBuilder().buildTopology().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("source2:\n\t\ttopics:\t\t[t, t1, t2]\n"));
    }

    @Test
    public void shouldCreateStringWithProcessors() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("other", this.mockProcessorSupplier, new String[]{"source"});
        String processorTopology = this.topology.getInternalBuilder().buildTopology().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("\t\tchildren:\t[processor, other]"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("processor:\n"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("other:\n"));
    }

    @Test
    public void shouldRecursivelyPrintChildren() {
        this.topology.addSource("source", new String[]{"t"}).addProcessor("processor", this.mockProcessorSupplier, new String[]{"source"}).addProcessor("child-one", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-one-one", this.mockProcessorSupplier, new String[]{"child-one"}).addProcessor("child-two", this.mockProcessorSupplier, new String[]{"processor"}).addProcessor("child-two-one", this.mockProcessorSupplier, new String[]{"child-two"});
        String processorTopology = this.topology.getInternalBuilder().buildTopology().toString();
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
        MatcherAssert.assertThat(processorTopology, CoreMatchers.containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
    }

    @Test
    public void shouldConsiderTimeStamps() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        createInputTopic.pipeInput("key1", "value1", 10L);
        createInputTopic.pipeInput("key2", "value2", 20L);
        createInputTopic.pipeInput("key3", "value3", 30L);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", 10L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3", 30L);
    }

    @Test
    public void shouldConsiderModifiedTimeStamps() {
        this.driver = new TopologyTestDriver(createTimestampTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        createInputTopic.pipeInput("key1", "value1", 10L);
        createInputTopic.pipeInput("key2", "value2", 20L);
        createInputTopic.pipeInput("key3", "value3", 30L);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", 30L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3", 40L);
    }

    @Test
    public void shouldConsiderModifiedTimeStampsForMultipleProcessors() {
        this.driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer());
        TestOutputTopic createOutputTopic2 = this.driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer());
        createInputTopic.pipeInput("key1", "value1", 10L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", 10L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key1", "value1", 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", 15L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key1", "value1", 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", 12L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key1", "value1", 22L);
        Assertions.assertTrue(createOutputTopic.isEmpty());
        Assertions.assertTrue(createOutputTopic2.isEmpty());
        createInputTopic.pipeInput("key2", "value2", 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", 20L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key2", "value2", 30L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", 25L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key2", "value2", 30L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", 22L);
        assertNextOutputRecord(createOutputTopic2.readRecord(), "key2", "value2", 32L);
        Assertions.assertTrue(createOutputTopic.isEmpty());
        Assertions.assertTrue(createOutputTopic2.isEmpty());
    }

    @Test
    public void shouldConsiderHeaders() {
        this.driver = new TopologyTestDriver(createSimpleTopology(10), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        createInputTopic.pipeInput(new TestRecord("key1", "value1", HEADERS, 10L));
        createInputTopic.pipeInput(new TestRecord("key2", "value2", HEADERS, 20L));
        createInputTopic.pipeInput(new TestRecord("key3", "value3", HEADERS, 30L));
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", HEADERS, 10L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", HEADERS, 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3", HEADERS, 30L);
    }

    @Test
    public void shouldAddHeaders() {
        this.driver = new TopologyTestDriver(createAddHeaderTopology(), this.props);
        TestInputTopic createInputTopic = this.driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
        createInputTopic.pipeInput("key1", "value1", 10L);
        createInputTopic.pipeInput("key2", "value2", 20L);
        createInputTopic.pipeInput("key3", "value3", 30L);
        TestOutputTopic createOutputTopic = this.driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key1", "value1", HEADERS, 10L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key2", "value2", HEADERS, 20L);
        assertNextOutputRecord(createOutputTopic.readRecord(), "key3", "value3", HEADERS, 30L);
    }

    @Test
    public void statelessTopologyShouldNotHavePersistentStore() {
        ProcessorTopology buildTopology = new TopologyWrapper().getInternalBuilder("anyAppId").buildTopology();
        Assertions.assertFalse(buildTopology.hasPersistentLocalStore());
        Assertions.assertFalse(buildTopology.hasPersistentGlobalStore());
    }

    @Test
    public void inMemoryStoreShouldNotResultInPersistentLocalStore() {
        Assertions.assertFalse(createLocalStoreTopology(Stores.inMemoryKeyValueStore("my-store")).hasPersistentLocalStore());
    }

    @Test
    public void persistentLocalStoreShouldBeDetected() {
        Assertions.assertTrue(createLocalStoreTopology(Stores.persistentKeyValueStore("my-store")).hasPersistentLocalStore());
    }

    @Test
    public void inMemoryStoreShouldNotResultInPersistentGlobalStore() {
        Assertions.assertFalse(createGlobalStoreTopology(Stores.inMemoryKeyValueStore("my-store")).hasPersistentGlobalStore());
    }

    @Test
    public void persistentGlobalStoreShouldBeDetected() {
        Assertions.assertTrue(createGlobalStoreTopology(Stores.persistentKeyValueStore("my-store")).hasPersistentGlobalStore());
    }

    private ProcessorTopology createLocalStoreTopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) {
        TopologyWrapper topologyWrapper = new TopologyWrapper();
        topologyWrapper.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{AssignmentTestUtils.TOPIC_PREFIX}).addProcessor("processor", () -> {
            return new StatefulProcessor(keyValueBytesStoreSupplier.name());
        }, new String[]{"source"}).addStateStore(Stores.keyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.String(), Serdes.String()), new String[]{"processor"});
        return topologyWrapper.getInternalBuilder("anyAppId").buildTopology();
    }

    @Deprecated
    private ProcessorTopology createGlobalStoreTopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier) {
        TopologyWrapper topologyWrapper = new TopologyWrapper();
        topologyWrapper.addGlobalStore(Stores.keyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled(), "global", STRING_DESERIALIZER, STRING_DESERIALIZER, AssignmentTestUtils.TOPIC_PREFIX, "processor", define(new OldAPIStatefulProcessor(keyValueBytesStoreSupplier.name())));
        return topologyWrapper.getInternalBuilder("anyAppId").buildTopology();
    }

    private void assertNextOutputRecord(TestRecord<String, String> testRecord, String str, String str2) {
        assertNextOutputRecord(testRecord, str, str2, 0L);
    }

    private void assertNextOutputRecord(TestRecord<String, String> testRecord, String str, String str2, Long l) {
        assertNextOutputRecord(testRecord, str, str2, new RecordHeaders(), l);
    }

    private void assertNextOutputRecord(TestRecord<String, String> testRecord, String str, String str2, Headers headers, Long l) {
        Assertions.assertEquals(str, testRecord.key());
        Assertions.assertEquals(str2, testRecord.value());
        Assertions.assertEquals(l, testRecord.timestamp());
        Assertions.assertEquals(headers, testRecord.headers());
    }

    private StreamPartitioner<Object, Object> constantPartitioner(Integer num) {
        return (str, obj, obj2, i) -> {
            return num;
        };
    }

    private Topology createSimpleTopology(int i) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", ForwardingProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor"});
    }

    private Topology createTimestampTopology(int i) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", TimestampProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor"});
    }

    private Topology createMultiProcessorTimestampTopology(int i) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", () -> {
            return new FanOutTimestampProcessor("child1", "child2");
        }, new String[]{"source"}).addProcessor("child1", ForwardingProcessor::new, new String[]{"processor"}).addProcessor("child2", TimestampProcessor::new, new String[]{"processor"}).addSink("sink1", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"child1"}).addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(Integer.valueOf(i)), new String[]{"child2"});
    }

    private Topology createSimpleTopologyWithDroppingPartitioner() {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", ForwardingProcessor::new, new String[]{"source"}).addSink("sink", OUTPUT_TOPIC_1, new DroppingPartitioner(), new String[]{"processor"});
    }

    @Deprecated
    private Topology createStatefulTopology(String str) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", define(new OldAPIStatefulProcessor(str)), new String[]{"source"}).addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str), Serdes.String(), Serdes.String()), new String[]{"processor"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    @Deprecated
    private Topology createConnectedStateStoreTopology(String str) {
        return this.topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor", defineWithStoresOldAPI(() -> {
            return new OldAPIStatefulProcessor(str);
        }, Collections.singleton(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(str), Serdes.String(), Serdes.String()))), new String[]{"source"}).addSink("counts", OUTPUT_TOPIC_1, new String[]{"processor"});
    }

    private Topology createInternalRepartitioningTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"source"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        TopologyWrapper.getInternalTopologyBuilder(this.topology).addInternalTopic(THROUGH_TOPIC_1, InternalTopicProperties.empty());
        return this.topology;
    }

    private Topology createInternalRepartitioningWithValueTimestampTopology() {
        this.topology.addSource("source", new String[]{INPUT_TOPIC_1}).addProcessor("processor", ValueTimestampProcessor::new, new String[]{"source"}).addSink("sink0", THROUGH_TOPIC_1, new String[]{"processor"}).addSource("source1", new String[]{THROUGH_TOPIC_1}).addSink("sink1", OUTPUT_TOPIC_1, new String[]{"source1"});
        TopologyWrapper.getInternalTopologyBuilder(this.topology).addInternalTopic(THROUGH_TOPIC_1, InternalTopicProperties.empty());
        return this.topology;
    }

    private Topology createForwardToSourceTopology() {
        return this.topology.addSource("source-1", new String[]{INPUT_TOPIC_1}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"source-1"}).addSource("source-2", new String[]{OUTPUT_TOPIC_1}).addSink("sink-2", OUTPUT_TOPIC_2, new String[]{"source-2"});
    }

    private Topology createSimpleMultiSourceTopology(int i) {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", ForwardingProcessor::new, new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(Integer.valueOf(i)), new String[]{"processor-1"}).addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_2}).addProcessor("processor-2", ForwardingProcessor::new, new String[]{"source-2"}).addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(Integer.valueOf(i)), new String[]{"processor-2"});
    }

    private Topology createAddHeaderTopology() {
        return this.topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, new String[]{INPUT_TOPIC_1}).addProcessor("processor-1", AddHeaderProcessor::new, new String[]{"source-1"}).addSink("sink-1", OUTPUT_TOPIC_1, new String[]{"processor-1"});
    }

    private <K, V> ProcessorSupplier<K, V> define(org.apache.kafka.streams.processor.Processor<K, V> processor) {
        return () -> {
            return processor;
        };
    }

    private <K, V> ProcessorSupplier<K, V> defineWithStoresOldAPI(final Supplier<org.apache.kafka.streams.processor.Processor<K, V>> supplier, final Set<StoreBuilder<?>> set) {
        return new ProcessorSupplier<K, V>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public org.apache.kafka.streams.processor.Processor<K, V> m134get() {
                return (org.apache.kafka.streams.processor.Processor) supplier.get();
            }

            public Set<StoreBuilder<?>> stores() {
                return set;
            }
        };
    }

    private <KIn, VIn, KOut, VOut> org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut> defineWithStores(final Supplier<Processor<KIn, VIn, KOut, VOut>> supplier, final Set<StoreBuilder<?>> set) {
        return new org.apache.kafka.streams.processor.api.ProcessorSupplier<KIn, VIn, KOut, VOut>() { // from class: org.apache.kafka.streams.processor.internals.ProcessorTopologyTest.2
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public Processor<KIn, VIn, KOut, VOut> m135get() {
                return (Processor) supplier.get();
            }

            public Set<StoreBuilder<?>> stores() {
                return set;
            }
        };
    }
}
