package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.class */
public class SuppressScenarioTest {
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private final Properties config = Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath())}));

    @Test
    public void shouldImmediatelyEmitEventsWithZeroEmitAfter() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str, str2) -> {
            return new KeyValue(str2, str);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        count.suppress(Suppressed.untilTimeLimit(Duration.ZERO, Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v2", 1L);
            createInputTopic.pipeInput("k2", "v1", 2L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 1L, 0L), new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L), new KeyValueTimestamp("v1", 1L, 2L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 1L, 0L), new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L), new KeyValueTimestamp("v1", 1L, 2L)));
            createInputTopic.pipeInput("x", "x", 3L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("x", 1L, 3L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("x", 1L, 3L)));
            createInputTopic.pipeInput("x", "y", 4L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("x", 0L, 4L), new KeyValueTimestamp("y", 1L, 4L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("x", 0L, 4L), new KeyValueTimestamp("y", 1L, 4L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithTimeLimit() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str, str2) -> {
            return new KeyValue(str2, str);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        count.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(2L), Suppressed.BufferConfig.unbounded())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v2", 1L);
            createInputTopic.pipeInput("k2", "v1", 2L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 1L, 0L), new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L), new KeyValueTimestamp("v1", 1L, 2L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("v1", 1L, 2L)));
            createInputTopic.pipeInput("tick", "tick", 3L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("tick", 1L, 3L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("v2", 1L, 1L)));
            createInputTopic.pipeInput("tick", "tock", 4L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("tick", 0L, 4L), new KeyValueTimestamp("tock", 1L, 4L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.emptyList());
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithRecordLimit() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str, str2) -> {
            return new KeyValue(str2, str);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.with(STRING_SERDE, Serdes.Long()));
        count.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxRecords(1L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v2", 1L);
            createInputTopic.pipeInput("k2", "v1", 2L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 1L, 0L), new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L), new KeyValueTimestamp("v1", 1L, 2L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L)));
            createInputTopic.pipeInput("x", "x", 3L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("x", 1L, 3L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("v1", 1L, 2L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSuppressIntermediateEventsWithBytesLimit() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.table("input", Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str, str2) -> {
            return new KeyValue(str2, str);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count();
        count.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxBytes(200L).emitEarlyWhenFull())).toStream().to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v2", 1L);
            createInputTopic.pipeInput("k2", "v1", 2L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 1L, 0L), new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L), new KeyValueTimestamp("v1", 1L, 2L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("v1", 0L, 1L), new KeyValueTimestamp("v2", 1L, 1L)));
            createInputTopic.pipeInput("x", "x", 3L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("x", 1L, 3L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("v1", 1L, 2L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(TimeWindows.of(Duration.ofMillis(2L)).grace(Duration.ofMillis(1L))).count(Materialized.as("counts").withCachingDisabled());
        count.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().map((windowed2, l2) -> {
            return new KeyValue(windowed2.toString(), l2);
        }).to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 1L);
            createInputTopic.pipeInput("k1", "v1", 2L);
            createInputTopic.pipeInput("k1", "v1", 1L);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 5L);
            createInputTopic.pipeInput("k1", "v1", 0L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/2]", 1L, 0L), new KeyValueTimestamp("[k1@0/2]", 2L, 1L), new KeyValueTimestamp("[k1@2/4]", 1L, 2L), new KeyValueTimestamp("[k1@0/2]", 3L, 1L), new KeyValueTimestamp("[k1@0/2]", 4L, 1L), new KeyValueTimestamp("[k1@4/6]", 1L, 5L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/2]", 4L, 1L), new KeyValueTimestamp("[k1@2/4]", 1L, 2L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(TimeWindows.of(Duration.ofMillis(2L)).grace(Duration.ofMillis(2L))).count(Materialized.as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
        count.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().map((windowed2, l2) -> {
            return new KeyValue(windowed2.toString(), l2);
        }).to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 1L);
            createInputTopic.pipeInput("k1", "v1", 2L);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 3L);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 4L);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 30L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/2]", 1L, 0L), new KeyValueTimestamp("[k1@0/2]", 2L, 1L), new KeyValueTimestamp("[k1@2/4]", 1L, 2L), new KeyValueTimestamp("[k1@0/2]", 3L, 1L), new KeyValueTimestamp("[k1@2/4]", 2L, 3L), new KeyValueTimestamp("[k1@0/2]", 4L, 1L), new KeyValueTimestamp("[k1@4/6]", 1L, 4L), new KeyValueTimestamp("[k1@30/32]", 1L, 30L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/2]", 4L, 1L), new KeyValueTimestamp("[k1@2/4]", 2L, 3L), new KeyValueTimestamp("[k1@4/6]", 1L, 4L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSupportFinalResultsForSlidingWindows() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SlidingWindows.withTimeDifferenceAndGrace(Duration.ofMillis(5L), Duration.ofMillis(15L))).count(Materialized.as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
        count.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().map((windowed2, l2) -> {
            return new KeyValue(windowed2.toString(), l2);
        }).to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 10L);
            createInputTopic.pipeInput("k1", "v1", 11L);
            createInputTopic.pipeInput("k1", "v1", 10L);
            createInputTopic.pipeInput("k1", "v1", 13L);
            createInputTopic.pipeInput("k1", "v1", 10L);
            createInputTopic.pipeInput("k1", "v1", 24L);
            createInputTopic.pipeInput("k1", "v1", 5L);
            createInputTopic.pipeInput("k1", "v1", 7L);
            createInputTopic.pipeInput("k1", "v1", 90L);
            Comparator thenComparing = Comparator.comparing(testRecord -> {
                return (String) testRecord.getKey();
            }).thenComparing(testRecord2 -> {
                return testRecord2.timestamp();
            });
            List drainProducerRecords = drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER);
            drainProducerRecords.sort(thenComparing);
            verify(drainProducerRecords, Arrays.asList(new KeyValueTimestamp("[k1@11/16]", 1L, 11L), new KeyValueTimestamp("[k1@11/16]", 2L, 13L), new KeyValueTimestamp("[k1@12/17]", 1L, 13L), new KeyValueTimestamp("[k1@19/24]", 1L, 24L), new KeyValueTimestamp("[k1@5/10]", 1L, 10L), new KeyValueTimestamp("[k1@5/10]", 2L, 10L), new KeyValueTimestamp("[k1@5/10]", 3L, 10L), new KeyValueTimestamp("[k1@5/10]", 4L, 10L), new KeyValueTimestamp("[k1@5/10]", 5L, 10L), new KeyValueTimestamp("[k1@6/11]", 2L, 11L), new KeyValueTimestamp("[k1@6/11]", 3L, 11L), new KeyValueTimestamp("[k1@6/11]", 4L, 11L), new KeyValueTimestamp("[k1@6/11]", 5L, 11L), new KeyValueTimestamp("[k1@8/13]", 4L, 13L), new KeyValueTimestamp("[k1@8/13]", 5L, 13L), new KeyValueTimestamp("[k1@85/90]", 1L, 90L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@5/10]", 5L, 10L), new KeyValueTimestamp("[k1@6/11]", 5L, 11L), new KeyValueTimestamp("[k1@8/13]", 5L, 13L), new KeyValueTimestamp("[k1@11/16]", 2L, 13L), new KeyValueTimestamp("[k1@12/17]", 1L, 13L), new KeyValueTimestamp("[k1@19/24]", 1L, 24L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldSupportFinalResultsForSessionWindows() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).groupBy((str, str2) -> {
            return str;
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).windowedBy(SessionWindows.with(Duration.ofMillis(5L)).grace(Duration.ofMillis(0L))).count(Materialized.as("counts").withCachingDisabled());
        count.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream().map((windowed, l) -> {
            return new KeyValue(windowed.toString(), l);
        }).to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        count.toStream().map((windowed2, l2) -> {
            return new KeyValue(windowed2.toString(), l2);
        }).to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        Topology build = streamsBuilder.build();
        System.out.println(build.describe());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(build, this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("k1", "v1", 0L);
            createInputTopic.pipeInput("k1", "v1", 5L);
            createInputTopic.pipeInput("k1", "v1", 1L);
            createInputTopic.pipeInput("k2", "v1", 11L);
            createInputTopic.pipeInput("k1", "v1", 5L);
            createInputTopic.pipeInput("k1", "v1", 30L);
            verify(drainProducerRecords(topologyTestDriver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/0]", 1L, 0L), new KeyValueTimestamp("[k1@0/0]", null, 0L), new KeyValueTimestamp("[k1@0/5]", 2L, 5L), new KeyValueTimestamp("[k1@0/5]", null, 5L), new KeyValueTimestamp("[k1@0/5]", 3L, 5L), new KeyValueTimestamp("[k2@11/11]", 1L, 11L), new KeyValueTimestamp("[k1@30/30]", 1L, 30L)));
            verify(drainProducerRecords(topologyTestDriver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("[k1@0/5]", 3L, 5L), new KeyValueTimestamp("[k2@11/11]", 1L, 11L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldWorkBeforeGroupBy() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(AssignmentTestUtils.TOPIC_PREFIX, Consumed.with(Serdes.String(), Serdes.String())).suppress(Suppressed.untilTimeLimit(Duration.ofMillis(10L), Suppressed.BufferConfig.unbounded())).groupBy((v0, v1) -> {
            return KeyValue.pair(v0, v1);
        }, Grouped.with(Serdes.String(), Serdes.String())).count().toStream().to("output", Produced.with(Serdes.String(), Serdes.Long()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TOPIC_PREFIX, STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("A", "a", 0L);
            createInputTopic.pipeInput("tick", "tick", 10L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, LONG_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("A", 1L, 0L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldWorkBeforeJoinRight() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("left", Consumed.with(Serdes.String(), Serdes.String())).outerJoin(streamsBuilder.table("right", Consumed.with(Serdes.String(), Serdes.String())).suppress(Suppressed.untilTimeLimit(Duration.ofMillis(10L), Suppressed.BufferConfig.unbounded())), (str, str2) -> {
            return String.format("(%s,%s)", str, str2);
        }).toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("right", STRING_SERIALIZER, STRING_SERIALIZER);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("left", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic.pipeInput("B", "1", 0L);
            createInputTopic.pipeInput("A", "1", 0L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            createInputTopic.pipeInput("tick", "tick", 10L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("A", "(null,1)", 0L), new KeyValueTimestamp("B", "(null,1)", 0L)));
            createInputTopic.pipeInput("A", "2", 11L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            createInputTopic2.pipeInput("A", "a", 12L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("A", "(a,1)", 12L)));
            createInputTopic2.pipeInput("B", "b", 12L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("B", "(b,1)", 12L)));
            createInputTopic2.pipeInput("A", "b", 13L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("A", "(b,1)", 13L)));
            createInputTopic.pipeInput("tick", "tick1", 21L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("tick", "(null,tick1)", 21L), new KeyValueTimestamp("A", "(b,2)", 13L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldWorkBeforeJoinLeft() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("left", Consumed.with(Serdes.String(), Serdes.String())).suppress(Suppressed.untilTimeLimit(Duration.ofMillis(10L), Suppressed.BufferConfig.unbounded())).outerJoin(streamsBuilder.table("right", Consumed.with(Serdes.String(), Serdes.String())), (str, str2) -> {
            return String.format("(%s,%s)", str, str2);
        }).toStream().to("output", Produced.with(Serdes.String(), Serdes.String()));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), this.config);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("right", STRING_SERIALIZER, STRING_SERIALIZER);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic("left", STRING_SERIALIZER, STRING_SERIALIZER);
            createInputTopic2.pipeInput("B", "1", 0L);
            createInputTopic2.pipeInput("A", "1", 0L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            createInputTopic2.pipeInput("tick", "tick", 10L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("A", "(1,null)", 0L), new KeyValueTimestamp("B", "(1,null)", 0L)));
            createInputTopic2.pipeInput("A", "2", 11L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.emptyList());
            createInputTopic.pipeInput("A", "a", 12L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("A", "(1,a)", 12L)));
            createInputTopic.pipeInput("B", "b", 12L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("B", "(1,b)", 12L)));
            createInputTopic.pipeInput("A", "b", 13L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Collections.singletonList(new KeyValueTimestamp("A", "(1,b)", 13L)));
            createInputTopic2.pipeInput("tick", "tick1", 21L);
            verify(drainProducerRecords(topologyTestDriver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), Arrays.asList(new KeyValueTimestamp("tick", "(tick1,null)", 21L), new KeyValueTimestamp("A", "(2,b)", 13L)));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldWorkWithCogrouped() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KGroupedStream groupByKey = streamsBuilder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
        groupByKey.cogroup((str, str2, obj) -> {
            return obj + str2;
        }).cogroup(streamsBuilder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())), (str3, str4, obj2) -> {
            return obj2 + str4;
        }).windowedBy(TimeWindows.of(Duration.ofMinutes(15L))).aggregate(() -> {
            return "";
        }, Named.as("test"), Materialized.as("store")).suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream();
    }

    private static <K, V> void verify(List<TestRecord<K, V>> list, List<KeyValueTimestamp<K, V>> list2) {
        if (list.size() != list2.size()) {
            throw new AssertionError(printRecords(list) + " != " + list2);
        }
        Iterator<KeyValueTimestamp<K, V>> it = list2.iterator();
        for (TestRecord<K, V> testRecord : list) {
            KeyValueTimestamp<K, V> next = it.next();
            try {
                MatcherAssert.assertThat(testRecord, CoreMatchers.equalTo(new TestRecord(next.key(), next.value(), (Headers) null, Long.valueOf(next.timestamp()))));
            } catch (AssertionError e) {
                throw new AssertionError(printRecords(list) + " != " + list2, e);
            }
        }
    }

    private static <K, V> List<TestRecord<K, V>> drainProducerRecords(TopologyTestDriver topologyTestDriver, String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return topologyTestDriver.createOutputTopic(str, deserializer, deserializer2).readRecordsToList();
    }

    private static <K, V> String printRecords(List<TestRecord<K, V>> list) {
        StringBuilder sb = new StringBuilder();
        sb.append("[\n");
        Iterator<TestRecord<K, V>> it = list.iterator();
        while (it.hasNext()) {
            sb.append("  ").append(it.next()).append("\n");
        }
        sb.append("]");
        return sb.toString();
    }
}
