package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.class */
public class KGroupedStreamImplTest {
    private static final String TOPIC = "topic";
    private static final String INVALID_STORE_NAME = "~foo bar~";
    private KGroupedStream<String, String> groupedStream;
    private final StreamsBuilder builder = new StreamsBuilder();
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @BeforeEach
    public void before() {
        this.groupedStream = this.builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
    }

    @Test
    public void shouldNotHaveNullAggregatorOnCogroup() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.cogroup((Aggregator) null);
        });
    }

    @Test
    public void shouldNotHaveNullReducerOnReduce() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.reduce((Reducer) null);
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnReduce() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotHaveNullReducerWithWindowedReduce() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).reduce((Reducer) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullWindowsWithWindowedReduce() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy((Windows) null);
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameWithWindowedReduce() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullAdderOnAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnAggregate() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnWindowedAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullAdderOnWindowedAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullWindowsOnWindowedAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy((Windows) null);
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnWindowedAggregate() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotHaveNullReducerWithSlidingWindowedReduce() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).reduce((Reducer) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullWindowsWithSlidingWindowedReduce() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy((SlidingWindows) null);
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameWithSlidingWindowedReduce() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotHaveNullInitializerOnSlidingWindowedAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveNullAdderOnSlidingWindowedAggregate() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, (Aggregator) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotHaveInvalidStoreNameOnSlidingWindowedAggregate() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(10L), Duration.ofMillis(100L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldCountSlidingWindows() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(500L), Duration.ofMillis(2000L))).count(Materialized.as("aggregate-by-key-windowed")).toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountSlidingWindows(mockApiProcessorSupplier);
    }

    @Test
    public void shouldCountSlidingWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        this.groupedStream.windowedBy(SlidingWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(500L), Duration.ofMillis(2000L))).count().toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountSlidingWindows(mockApiProcessorSupplier);
    }

    private void doCountSlidingWindows(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("1", "A", 500L);
            createInputTopic.pipeInput("1", "A", 999L);
            createInputTopic.pipeInput("1", "A", 600L);
            createInputTopic.pipeInput("2", "B", 500L);
            createInputTopic.pipeInput("2", "B", 600L);
            createInputTopic.pipeInput("2", "B", 700L);
            createInputTopic.pipeInput("3", "C", 501L);
            createInputTopic.pipeInput("1", "A", 1000L);
            createInputTopic.pipeInput("1", "A", 1000L);
            createInputTopic.pipeInput("2", "B", 1000L);
            createInputTopic.pipeInput("2", "B", 1000L);
            createInputTopic.pipeInput("3", "C", 600L);
            topologyTestDriver.close();
            Comparator<? super KeyValueTimestamp<Windowed<String>, Long>> thenComparing = Comparator.comparing(keyValueTimestamp -> {
                return (String) ((Windowed) keyValueTimestamp.key()).key();
            }).thenComparing(keyValueTimestamp2 -> {
                return Long.valueOf(((Windowed) keyValueTimestamp2.key()).window().start());
            });
            ArrayList<KeyValueTimestamp<Windowed<String>, Long>> processed = mockApiProcessorSupplier.theCapturedProcessor().processed();
            processed.sort(thenComparing);
            MatcherAssert.assertThat(processed, CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 1L, 500L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(499L, 999L)), 2L, 999L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(499L, 999L)), 3L, 999L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(501L, 1001L)), 1L, 999L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(501L, 1001L)), 2L, 999L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(501L, 1001L)), 3L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(601L, 1101L)), 1L, 999L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(601L, 1101L)), 2L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(601L, 1101L)), 3L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(1000L, 1500L)), 1L, 1000L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(1000L, 1500L)), 2L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 1L, 500L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(200L, 700L)), 3L, 700L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 4L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 5L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(501L, 1001L)), 1L, 600L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(501L, 1001L)), 2L, 700L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(501L, 1001L)), 3L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(501L, 1001L)), 4L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(601L, 1101L)), 1L, 700L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(601L, 1101L)), 2L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(601L, 1101L)), 3L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(701L, 1201L)), 1L, 1000L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(701L, 1201L)), 2L, 1000L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(1L, 501L)), 1L, 501L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(100L, 600L)), 2L, 600L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(502L, 1002L)), 1L, 600L))));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void doAggregateSessionWindows(MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("1", "1", 10L);
            createInputTopic.pipeInput("2", "2", 15L);
            createInputTopic.pipeInput("1", "1", 30L);
            createInputTopic.pipeInput("1", "1", 70L);
            createInputTopic.pipeInput("1", "1", 100L);
            createInputTopic.pipeInput("1", "1", 90L);
            topologyTestDriver.close();
            Map<Windowed<String>, ValueAndTimestamp<Integer>> lastValueAndTimestampPerKey = mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey();
            Assertions.assertEquals(ValueAndTimestamp.make(2, 30L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(10L, 30L))));
            Assertions.assertEquals(ValueAndTimestamp.make(1, 15L), lastValueAndTimestampPerKey.get(new Windowed("2", new SessionWindow(15L, 15L))));
            Assertions.assertEquals(ValueAndTimestamp.make(3, 100L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(70L, 100L))));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAggregateSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        KTable aggregate = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).aggregate(() -> {
            return 0;
        }, (str, str2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        }, (str3, num2, num3) -> {
            return Integer.valueOf(num2.intValue() + num3.intValue());
        }, Materialized.as("session-store").withValueSerde(Serdes.Integer()));
        aggregate.toStream().process(mockApiProcessorSupplier, new String[0]);
        doAggregateSessionWindows(mockApiProcessorSupplier);
        Assertions.assertEquals(aggregate.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldAggregateSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Integer, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).aggregate(() -> {
            return 0;
        }, (str, str2, num) -> {
            return Integer.valueOf(num.intValue() + 1);
        }, (str3, num2, num3) -> {
            return Integer.valueOf(num2.intValue() + num3.intValue());
        }, Materialized.with((Serde) null, Serdes.Integer())).toStream().process(mockApiProcessorSupplier, new String[0]);
        doAggregateSessionWindows(mockApiProcessorSupplier);
    }

    private void doCountSessionWindows(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("1", "1", 10L);
            createInputTopic.pipeInput("2", "2", 15L);
            createInputTopic.pipeInput("1", "1", 30L);
            createInputTopic.pipeInput("1", "1", 70L);
            createInputTopic.pipeInput("1", "1", 100L);
            createInputTopic.pipeInput("1", "1", 90L);
            topologyTestDriver.close();
            Map<Windowed<String>, ValueAndTimestamp<Long>> lastValueAndTimestampPerKey = mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey();
            Assertions.assertEquals(ValueAndTimestamp.make(2L, 30L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(10L, 30L))));
            Assertions.assertEquals(ValueAndTimestamp.make(1L, 15L), lastValueAndTimestampPerKey.get(new Windowed("2", new SessionWindow(15L, 15L))));
            Assertions.assertEquals(ValueAndTimestamp.make(3L, 100L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(70L, 100L))));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCountSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        KTable count = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).count(Materialized.as("session-store"));
        count.toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountSessionWindows(mockApiProcessorSupplier);
        Assertions.assertEquals(count.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldCountSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        KTable count = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).count();
        count.toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountSessionWindows(mockApiProcessorSupplier);
        Assertions.assertNull(count.queryableStoreName());
    }

    private void doReduceSessionWindows(MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("1", "A", 10L);
            createInputTopic.pipeInput("2", "Z", 15L);
            createInputTopic.pipeInput("1", "B", 30L);
            createInputTopic.pipeInput("1", "A", 70L);
            createInputTopic.pipeInput("1", "B", 100L);
            createInputTopic.pipeInput("1", "C", 90L);
            topologyTestDriver.close();
            Map<Windowed<String>, ValueAndTimestamp<String>> lastValueAndTimestampPerKey = mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey();
            Assertions.assertEquals(ValueAndTimestamp.make("A:B", 30L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(10L, 30L))));
            Assertions.assertEquals(ValueAndTimestamp.make("Z", 15L), lastValueAndTimestampPerKey.get(new Windowed("2", new SessionWindow(15L, 15L))));
            Assertions.assertEquals(ValueAndTimestamp.make("A:B:C", 100L), lastValueAndTimestampPerKey.get(new Windowed("1", new SessionWindow(70L, 100L))));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldReduceSessionWindows() {
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        KTable reduce = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).reduce((str, str2) -> {
            return str + ":" + str2;
        }, Materialized.as("session-store"));
        reduce.toStream().process(mockApiProcessorSupplier, new String[0]);
        doReduceSessionWindows(mockApiProcessorSupplier);
        Assertions.assertEquals(reduce.queryableStoreName(), "session-store");
    }

    @Test
    public void shouldReduceSessionWindowsWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, String, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        KTable reduce = this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).reduce((str, str2) -> {
            return str + ":" + str2;
        });
        reduce.toStream().process(mockApiProcessorSupplier, new String[0]);
        doReduceSessionWindows(mockApiProcessorSupplier);
        Assertions.assertNull(reduce.queryableStoreName());
    }

    @Test
    public void shouldNotAcceptNullReducerWhenReducingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).reduce((Reducer) null, Materialized.as("store"));
        });
    }

    @Test
    public void shouldNotAcceptNullSessionWindowsReducingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy((SessionWindows) null);
        });
    }

    @Test
    public void shouldNotAcceptInvalidStoreNameWhenReducingSessionWindows() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).reduce(MockReducer.STRING_ADDER, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).reduce((Reducer) null, Materialized.as((String) null));
        });
    }

    @Test
    public void shouldNotAcceptNullInitializerWhenAggregatingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
                return null;
            }, Materialized.as("storeName"));
        });
    }

    @Test
    public void shouldNotAcceptNullAggregatorWhenAggregatingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, (Aggregator) null, (str, str2, str3) -> {
                return null;
            }, Materialized.as("storeName"));
        });
    }

    @Test
    public void shouldNotAcceptNullSessionMergerWhenAggregatingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(30L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null, Materialized.as("storeName"));
        });
    }

    @Test
    public void shouldNotAcceptNullSessionWindowsWhenAggregatingSessionWindows() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.windowedBy((SessionWindows) null);
        });
    }

    @Test
    public void shouldAcceptNullStoreNameWhenAggregatingSessionWindows() {
        this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
            return null;
        }, Materialized.with(Serdes.String(), Serdes.String()));
    }

    @Test
    public void shouldNotAcceptInvalidStoreNameWhenAggregatingSessionWindows() {
        Assertions.assertThrows(TopologyException.class, () -> {
            this.groupedStream.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(10L))).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (str, str2, str3) -> {
                return null;
            }, Materialized.as(INVALID_STORE_NAME));
        });
    }

    @Test
    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
        });
    }

    @Test
    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.groupedStream.count((Materialized) null);
        });
    }

    @Test
    public void shouldCountAndMaterializeResults() {
        this.groupedStream.count(Materialized.as("count").withKeySerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("count");
            MatcherAssert.assertThat((Long) keyValueStore.get("1"), CoreMatchers.equalTo(3L));
            MatcherAssert.assertThat((Long) keyValueStore.get("2"), CoreMatchers.equalTo(1L));
            MatcherAssert.assertThat((Long) keyValueStore.get("3"), CoreMatchers.equalTo(2L));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("count");
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make(3L, 10L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make(1L, 1L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make(2L, 9L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInAggregate() {
        this.groupedStream.count(Materialized.as("count").withKeySerde(Serdes.String()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamAggregate.class);
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
            try {
                processData(topologyTestDriver);
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] offset=[6]"));
                topologyTestDriver.close();
                if (createAndRegister != null) {
                    createAndRegister.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldReduceAndMaterializeResults() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("reduce");
            MatcherAssert.assertThat((String) keyValueStore.get("1"), CoreMatchers.equalTo("A+C+D"));
            MatcherAssert.assertThat((String) keyValueStore.get("2"), CoreMatchers.equalTo("B"));
            MatcherAssert.assertThat((String) keyValueStore.get("3"), CoreMatchers.equalTo("E+F"));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("reduce");
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("A+C+D", 10L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("B", 1L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("E+F", 9L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldLogAndMeasureSkipsInReduce() {
        this.groupedStream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduce").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamReduce.class);
        try {
            TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
            try {
                processData(topologyTestDriver);
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key or value. topic=[topic] partition=[0] offset=[6]"));
                topologyTestDriver.close();
                if (createAndRegister != null) {
                    createAndRegister.close();
                }
            } finally {
            }
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldAggregateAndMaterializeResults() {
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, Materialized.as("aggregate").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            KeyValueStore keyValueStore = topologyTestDriver.getKeyValueStore("aggregate");
            MatcherAssert.assertThat((String) keyValueStore.get("1"), CoreMatchers.equalTo("0+A+C+D"));
            MatcherAssert.assertThat((String) keyValueStore.get("2"), CoreMatchers.equalTo("0+B"));
            MatcherAssert.assertThat((String) keyValueStore.get("3"), CoreMatchers.equalTo("0+E+F"));
            KeyValueStore timestampedKeyValueStore = topologyTestDriver.getTimestampedKeyValueStore("aggregate");
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+B", 1L)));
            MatcherAssert.assertThat((ValueAndTimestamp) timestampedKeyValueStore.get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldAggregateWithDefaultSerdes() {
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        this.groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER).toStream().process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            processData(topologyTestDriver);
            MatcherAssert.assertThat((ValueAndTimestamp) mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("1"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+A+C+D", 10L)));
            MatcherAssert.assertThat((ValueAndTimestamp) mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("2"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+B", 1L)));
            MatcherAssert.assertThat((ValueAndTimestamp) mockApiProcessorSupplier.theCapturedProcessor().lastValueAndTimestampPerKey().get("3"), CoreMatchers.equalTo(ValueAndTimestamp.make("0+E+F", 9L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void processData(TopologyTestDriver topologyTestDriver) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
        createInputTopic.pipeInput("1", "A", 5L);
        createInputTopic.pipeInput("2", "B", 1L);
        createInputTopic.pipeInput("1", "C", 3L);
        createInputTopic.pipeInput("1", "D", 10L);
        createInputTopic.pipeInput("3", "E", 8L);
        createInputTopic.pipeInput("3", "F", 9L);
        createInputTopic.pipeInput("3", (String) null);
    }

    private void doCountWindowed(MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier) {
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("topic", new StringSerializer(), new StringSerializer());
            createInputTopic.pipeInput("1", "A", 0L);
            createInputTopic.pipeInput("1", "A", 499L);
            createInputTopic.pipeInput("1", "A", 100L);
            createInputTopic.pipeInput("2", "B", 0L);
            createInputTopic.pipeInput("2", "B", 100L);
            createInputTopic.pipeInput("2", "B", 200L);
            createInputTopic.pipeInput("3", "C", 1L);
            createInputTopic.pipeInput("1", "A", 500L);
            createInputTopic.pipeInput("1", "A", 500L);
            createInputTopic.pipeInput("2", "B", 500L);
            createInputTopic.pipeInput("2", "B", 500L);
            createInputTopic.pipeInput("3", "B", 100L);
            topologyTestDriver.close();
            MatcherAssert.assertThat(mockApiProcessorSupplier.theCapturedProcessor().processed(), CoreMatchers.equalTo(Arrays.asList(new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 2L, 499L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(0L, 500L)), 3L, 499L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 1L, 0L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 2L, 100L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(0L, 500L)), 3L, 200L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(0L, 500L)), 1L, 1L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp(new Windowed("1", new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 1L, 500L), new KeyValueTimestamp(new Windowed("2", new TimeWindow(500L, 1000L)), 2L, 500L), new KeyValueTimestamp(new Windowed("3", new TimeWindow(0L, 500L)), 2L, 100L))));
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldCountWindowed() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        this.groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(500L), Duration.ofMillis(100L))).count(Materialized.as("aggregate-by-key-windowed")).toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountWindowed(mockApiProcessorSupplier);
    }

    @Test
    public void shouldCountWindowedWithInternalStoreName() {
        MockApiProcessorSupplier<Windowed<String>, Long, Void, Void> mockApiProcessorSupplier = new MockApiProcessorSupplier<>();
        this.groupedStream.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(500L), Duration.ofMillis(100L))).count().toStream().process(mockApiProcessorSupplier, new String[0]);
        doCountWindowed(mockApiProcessorSupplier);
    }
}
