package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.test.MockApiProcessor;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.class */
public class KStreamKStreamLeftJoinTest {
    private final String topic1 = AssignmentTestUtils.TP_1_NAME;
    private final String topic2 = AssignmentTestUtils.TP_2_NAME;
    private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String());
    private static final KeyValueTimestamp[] EMPTY = new KeyValueTimestamp[0];
    private static final Properties PROPS = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.String(), (Serde<?>) Serdes.String());

    @Test
    public void testLeftJoinWithSpuriousResultFixDisabledOldApi() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(PROPS), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            Assertions.assertEquals(2, topologyTestDriver.getAllStateStores().size());
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i]);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(1, "A1+null", 0L));
            for (int i2 = 0; i2 < 2; i2++) {
                createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "a" + iArr[i2]);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftJoinDuplicatesWithSpuriousResultFixDisabledOldApi() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(Duration.ofMillis(100L)).grace(Duration.ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(PROPS), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            Assertions.assertEquals(2, topologyTestDriver.getAllStateStores().size());
            createInputTopic.pipeInput(0, "A0", 0L);
            createInputTopic.pipeInput(0, "A0-0", 0L);
            createInputTopic2.pipeInput(0, "a0", 0L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(0, "A0-0+null", 0L), new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(0, "A0-0+a0", 0L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftJoinDuplicates() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic.pipeInput(0, "A0", 0L);
            createInputTopic.pipeInput(0, "A0-0", 0L);
            createInputTopic2.pipeInput(1, "a0", 111L);
            createInputTopic2.pipeInput(2, "dummy", 500L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(0, "A0-0+null", 0L));
            createInputTopic.pipeInput(2, "A2", 1000L);
            createInputTopic.pipeInput(2, "A2-0", 1000L);
            createInputTopic2.pipeInput(2, "a2", 1001L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+a2", 1001L), new KeyValueTimestamp<>(2, "A2-0+a2", 1001L));
            createInputTopic2.pipeInput(3, "a3", 315L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftExpiredNonJoinedRecordsAreEmittedByTheLeftProcessor() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic.pipeInput(0, "A0", 1L);
            createInputTopic.pipeInput(1, "A1", 2L);
            createInputTopic.pipeInput(0, "A0-0", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic2.pipeInput(1, "a1", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+a1", 3L));
            createInputTopic.pipeInput(2, "dummy", 401L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 1L), new KeyValueTimestamp<>(0, "A0-0+null", 3L));
            createInputTopic2.pipeInput(2, "dummy", 401L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "dummy+dummy", 401L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftExpiredNonJoinedRecordsAreEmittedByTheRightProcessor() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic.pipeInput(0, "A0", 1L);
            createInputTopic.pipeInput(1, "A1", 2L);
            createInputTopic.pipeInput(0, "A0-0", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic2.pipeInput(1, "a1", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+a1", 3L));
            createInputTopic2.pipeInput(2, "dummy", 401L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 1L), new KeyValueTimestamp<>(0, "A0-0+null", 3L));
            createInputTopic.pipeInput(2, "dummy", 402L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "dummy+dummy", 402L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRightNonJoinedRecordsAreNeverEmittedByTheLeftProcessor() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(0L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic2.pipeInput(0, "A0", 1L);
            createInputTopic2.pipeInput(1, "A1", 2L);
            createInputTopic2.pipeInput(0, "A0-0", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic.pipeInput(1, "a1", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "a1+A1", 3L));
            createInputTopic.pipeInput(2, "dummy", 401L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic2.pipeInput(2, "dummy", 402L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "dummy+dummy", 402L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testRightNonJoinedRecordsAreNeverEmittedByTheRightProcessor() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic2.pipeInput(0, "A0", 1L);
            createInputTopic2.pipeInput(1, "A1", 2L);
            createInputTopic2.pipeInput(0, "A0-0", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic.pipeInput(1, "a1", 3L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "a1+A1", 3L));
            createInputTopic2.pipeInput(2, "dummy", 401L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic.pipeInput(2, "dummy", 402L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "dummy+dummy", 402L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftJoinedRecordsWithZeroAfterAreEmitted() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)).after(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assertions.assertEquals(1, copartitionGroups.size());
        Assertions.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            theCapturedProcessor.init(null);
            for (int i = 0; i < iArr.length; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 1000 + i);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 1000L), new KeyValueTimestamp<>(1, "A1+null", 1001L), new KeyValueTimestamp<>(2, "A2+null", 1002L));
            for (int i2 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i2), "a" + i2, 999L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 1000L), new KeyValueTimestamp<>(1, "A1+a1", 1001L), new KeyValueTimestamp<>(2, "A2+a2", 1002L), new KeyValueTimestamp<>(3, "A3+a3", 1003L));
            long j = 999 + 1;
            for (int i3 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i3), "b" + i3, j);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 1000L), new KeyValueTimestamp<>(1, "A1+b1", 1001L), new KeyValueTimestamp<>(2, "A2+b2", 1002L), new KeyValueTimestamp<>(3, "A3+b3", 1003L));
            long j2 = j + 1;
            for (int i4 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i4), "c" + i4, j2);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+c1", 1001L), new KeyValueTimestamp<>(2, "A2+c2", 1002L), new KeyValueTimestamp<>(3, "A3+c3", 1003L));
            long j3 = j2 + 1;
            for (int i5 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i5), "d" + i5, j3);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "A2+d2", 1002L), new KeyValueTimestamp<>(3, "A3+d3", 1003L));
            long j4 = j3 + 1;
            for (int i6 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i6), "e" + i6, j4);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "A3+e3", 1003L));
            for (int i7 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i7), "f" + i7, 1100L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testLeftJoinWithInMemoryCustomSuppliers() {
        JoinWindows ofTimeDifferenceAndGrace = JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(0L));
        WindowBytesStoreSupplier inMemoryWindowStore = Stores.inMemoryWindowStore("in-memory-join-store", Duration.ofMillis(ofTimeDifferenceAndGrace.size() + ofTimeDifferenceAndGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceAndGrace.size()), true);
        runLeftJoin(StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()).withThisStoreSupplier(inMemoryWindowStore).withOtherStoreSupplier(Stores.inMemoryWindowStore("in-memory-join-store-other", Duration.ofMillis(ofTimeDifferenceAndGrace.size() + ofTimeDifferenceAndGrace.gracePeriodMs()), Duration.ofMillis(ofTimeDifferenceAndGrace.size()), true)), ofTimeDifferenceAndGrace);
    }

    @Test
    public void testLeftJoinWithDefaultSuppliers() {
        runLeftJoin(StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()), JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)));
    }

    public void runLeftJoin(StreamJoined<Integer, String, String> streamJoined, JoinWindows joinWindows) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, joinWindows, streamJoined).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assertions.assertEquals(1, copartitionGroups.size());
        Assertions.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            Assertions.assertEquals(3, topologyTestDriver.getAllStateStores().size());
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i]);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (int i2 = 0; i2 < 2; i2++) {
                createInputTopic2.pipeInput(Integer.valueOf(iArr[i2]), "a" + iArr[i2]);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
            for (int i3 = 0; i3 < 3; i3++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i3]), "B" + iArr[i3]);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+a0", 0L), new KeyValueTimestamp<>(1, "B1+a1", 0L));
            for (int i4 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i4), "b" + i4);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+b0", 0L), new KeyValueTimestamp<>(0, "B0+b0", 0L), new KeyValueTimestamp<>(1, "A1+b1", 0L), new KeyValueTimestamp<>(1, "B1+b1", 0L), new KeyValueTimestamp<>(2, "B2+b2", 0L));
            for (int i5 : iArr) {
                createInputTopic.pipeInput(Integer.valueOf(i5), "C" + i5);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+a0", 0L), new KeyValueTimestamp<>(0, "C0+b0", 0L), new KeyValueTimestamp<>(1, "C1+a1", 0L), new KeyValueTimestamp<>(1, "C1+b1", 0L), new KeyValueTimestamp<>(2, "C2+b2", 0L), new KeyValueTimestamp<>(3, "C3+b3", 0L));
            createInputTopic.pipeInput(0, "dummy", 1000L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOrdering() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            createInputTopic.pipeInput(0, "A0", 0L);
            createInputTopic.pipeInput(1, "A1", 100L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic2.pipeInput(1, "a1", 110L);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(1, "A1+a1", 110L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testGracePeriod() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceAndGrace(Duration.ofMillis(100L), Duration.ofMillis(10L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assertions.assertEquals(1, copartitionGroups.size());
        Assertions.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 0L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            long j = 0 + 101;
            for (int i2 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i2), "a" + i2, j);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            createInputTopic2.pipeInput(0, "dummy", j + 1100);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+null", 0L), new KeyValueTimestamp<>(1, "A1+null", 0L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testWindowing() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        Collection copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build()).copartitionGroups();
        Assertions.assertEquals(1, copartitionGroups.size());
        Assertions.assertEquals(new HashSet(Arrays.asList(AssignmentTestUtils.TP_1_NAME, AssignmentTestUtils.TP_2_NAME)), copartitionGroups.iterator().next());
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor<Integer, String, Void, Void> theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 0L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            for (int i2 : iArr) {
                createInputTopic2.pipeInput(Integer.valueOf(i2), "a" + i2, 0L);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 0L), new KeyValueTimestamp<>(1, "A1+a1", 0L));
            testUpperWindowBound(iArr, topologyTestDriver, theCapturedProcessor);
            testLowerWindowBound(iArr, topologyTestDriver, theCapturedProcessor);
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void shouldNotEmitLeftJoinResultForAsymmetricWindow() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {0, 1, 2, 3};
        MockApiProcessorSupplier mockApiProcessorSupplier = new MockApiProcessorSupplier();
        streamsBuilder.stream(AssignmentTestUtils.TP_1_NAME, this.consumed).leftJoin(streamsBuilder.stream(AssignmentTestUtils.TP_2_NAME, this.consumed), MockValueJoiner.TOSTRING_JOINER, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100L)).before(Duration.ZERO), StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String())).process(mockApiProcessorSupplier, new String[0]);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), PROPS);
        try {
            TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
            MockApiProcessor theCapturedProcessor = mockApiProcessorSupplier.theCapturedProcessor();
            for (int i = 0; i < 2; i++) {
                createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "A" + iArr[i], 0 + i);
            }
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
            long j = 0 + 100;
            createInputTopic2.pipeInput(Integer.valueOf(iArr[0]), "a" + iArr[0], j);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "A0+a0", 100L));
            createInputTopic2.pipeInput(Integer.valueOf(iArr[1]), "a" + iArr[1], j + 2);
            theCapturedProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "A1+null", 1L));
            topologyTestDriver.close();
        } catch (Throwable th) {
            try {
                topologyTestDriver.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void testUpperWindowBound(int[] iArr, TopologyTestDriver topologyTestDriver, MockApiProcessor<Integer, String, Void, Void> mockApiProcessor) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        TestInputTopic createInputTopic2 = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_2_NAME, new IntegerSerializer(), new StringSerializer(), Instant.ofEpochMilli(0L), Duration.ZERO);
        for (int i = 0; i < iArr.length; i++) {
            createInputTopic2.pipeInput(Integer.valueOf(iArr[i]), "b" + iArr[i], 1000 + i);
        }
        mockApiProcessor.checkAndClearProcessResult(EMPTY);
        for (int i2 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i2), "B" + i2, 1100L);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "B0+b0", 1100L), new KeyValueTimestamp<>(1, "B1+b1", 1100L), new KeyValueTimestamp<>(2, "B2+b2", 1100L), new KeyValueTimestamp<>(3, "B3+b3", 1100L));
        long j = 1100 + 1;
        for (int i3 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i3), "C" + i3, j);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(1, "C1+b1", 1101L), new KeyValueTimestamp<>(2, "C2+b2", 1101L), new KeyValueTimestamp<>(3, "C3+b3", 1101L));
        long j2 = j + 1;
        for (int i4 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i4), "D" + i4, j2);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(2, "D2+b2", 1102L), new KeyValueTimestamp<>(3, "D3+b3", 1102L));
        long j3 = j2 + 1;
        for (int i5 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i5), "E" + i5, j3);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(3, "E3+b3", 1103L));
        long j4 = j3 + 1;
        for (int i6 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i6), "F" + i6, j4);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp[0]);
        createInputTopic.pipeInput(0, "dummy", j4 + 301);
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "C0+null", 1101L), new KeyValueTimestamp<>(0, "D0+null", 1102L), new KeyValueTimestamp<>(1, "D1+null", 1102L), new KeyValueTimestamp<>(0, "E0+null", 1103L), new KeyValueTimestamp<>(1, "E1+null", 1103L), new KeyValueTimestamp<>(2, "E2+null", 1103L), new KeyValueTimestamp<>(0, "F0+null", 1104L), new KeyValueTimestamp<>(1, "F1+null", 1104L), new KeyValueTimestamp<>(2, "F2+null", 1104L), new KeyValueTimestamp<>(3, "F3+null", 1104L));
    }

    private void testLowerWindowBound(int[] iArr, TopologyTestDriver topologyTestDriver, MockApiProcessor<Integer, String, Void, Void> mockApiProcessor) {
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(AssignmentTestUtils.TP_1_NAME, new IntegerSerializer(), new StringSerializer());
        for (int i : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i), "G" + i, 899L);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "G0+null", 899L), new KeyValueTimestamp<>(1, "G1+null", 899L), new KeyValueTimestamp<>(2, "G2+null", 899L), new KeyValueTimestamp<>(3, "G3+null", 899L));
        long j = 899 + 1;
        for (int i2 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i2), "H" + i2, j);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "H0+b0", 1000L), new KeyValueTimestamp<>(1, "H1+null", 900L), new KeyValueTimestamp<>(2, "H2+null", 900L), new KeyValueTimestamp<>(3, "H3+null", 900L));
        long j2 = j + 1;
        for (int i3 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i3), "I" + i3, j2);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "I0+b0", 1000L), new KeyValueTimestamp<>(1, "I1+b1", 1001L), new KeyValueTimestamp<>(2, "I2+null", 901L), new KeyValueTimestamp<>(3, "I3+null", 901L));
        long j3 = j2 + 1;
        for (int i4 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i4), "J" + i4, j3);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "J0+b0", 1000L), new KeyValueTimestamp<>(1, "J1+b1", 1001L), new KeyValueTimestamp<>(2, "J2+b2", 1002L), new KeyValueTimestamp<>(3, "J3+null", 902L));
        long j4 = j3 + 1;
        for (int i5 : iArr) {
            createInputTopic.pipeInput(Integer.valueOf(i5), "K" + i5, j4);
        }
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "K0+b0", 1000L), new KeyValueTimestamp<>(1, "K1+b1", 1001L), new KeyValueTimestamp<>(2, "K2+b2", 1002L), new KeyValueTimestamp<>(3, "K3+b3", 1003L));
        createInputTopic.pipeInput(0, "dummy", j4 + 300);
        mockApiProcessor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "dummy+null", 1203L));
    }
}
