package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.test.TestRecord;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.class */
public class StreamTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
    private static final String STORE_NAME = "table-store";
    private static final String APP_ID = "stream-table-join-integration-test";

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInner(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KTable table = streamsBuilder.table("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-table-join-integration-test-inner");
        stream.join(table, this.valueJoiner).to("outputTopic");
        runTestWithDriver(this.input, Arrays.asList(null, null, null, null, Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), null, null, null, null, null, null, null, null, null, Collections.singletonList(new TestRecord(0L, "D-d", (Headers) null, 6L)), null, null, null, Collections.singletonList(new TestRecord(0L, "E-e", (Headers) null, 15L)), null, null, null, null), properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeft(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KTable table = streamsBuilder.table("inputTopicRight");
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-table-join-integration-test-left");
        stream.leftJoin(table, this.valueJoiner).to("outputTopic");
        runTestWithDriver(this.input, Arrays.asList(null, null, Collections.singletonList(new TestRecord(0L, "A-null", (Headers) null, 3L)), null, Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), null, null, null, Collections.singletonList(new TestRecord(0L, "C-null", (Headers) null, 9L)), null, null, null, null, null, Collections.singletonList(new TestRecord(0L, "D-d", (Headers) null, 6L)), null, null, null, Collections.singletonList(new TestRecord(0L, "E-e", (Headers) null, 15L)), null, null, Collections.singletonList(new TestRecord(0L, "F-null", (Headers) null, 4L)), null), properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testInnerWithVersionedStore(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KTable table = streamsBuilder.table("inputTopicRight", Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5L))));
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-table-join-integration-test-inner");
        stream.join(table, this.valueJoiner).to("outputTopic");
        runTestWithDriver(this.input, Arrays.asList(null, null, null, null, Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), null, null, null, null, null, null, null, null, null, Collections.singletonList(new TestRecord(0L, "D-b", (Headers) null, 6L)), null, null, null, Collections.singletonList(new TestRecord(0L, "E-e", (Headers) null, 15L)), null, null, Collections.singletonList(new TestRecord(0L, "F-a", (Headers) null, 4L)), null), properties, streamsBuilder.build(properties));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLeftWithVersionedStore(boolean z) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream("inputTopicLeft");
        KTable table = streamsBuilder.table("inputTopicRight", Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMinutes(5L))));
        Properties properties = setupConfigsAndUtils(z);
        properties.put("application.id", "stream-table-join-integration-test-left");
        stream.leftJoin(table, this.valueJoiner).to("outputTopic");
        runTestWithDriver(this.input, Arrays.asList(null, null, Collections.singletonList(new TestRecord(0L, "A-null", (Headers) null, 3L)), null, Collections.singletonList(new TestRecord(0L, "B-a", (Headers) null, 5L)), null, null, null, Collections.singletonList(new TestRecord(0L, "C-null", (Headers) null, 9L)), null, null, null, null, null, Collections.singletonList(new TestRecord(0L, "D-b", (Headers) null, 6L)), null, null, null, Collections.singletonList(new TestRecord(0L, "E-e", (Headers) null, 15L)), null, null, Collections.singletonList(new TestRecord(0L, "F-a", (Headers) null, 4L)), null), properties, streamsBuilder.build(properties));
    }
}
