package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreamsWrapper;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.class */
public class JoinWithIncompleteMetadataIntegrationTest {
    private static final String APP_ID = "join-incomplete-metadata-integration-test";
    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
    static final String NON_EXISTENT_INPUT_TOPIC_LEFT = "inputTopicLeft-not-exist";
    static final String OUTPUT_TOPIC = "outputTopic";
    StreamsBuilder builder;
    final ValueJoiner<String, String, String> valueJoiner = (str, str2) -> {
        return str + "-" + str2;
    };
    private KTable<Long, String> rightTable;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Long COMMIT_INTERVAL = 100L;
    static final Properties STREAMS_CONFIG = new Properties();

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
        STREAMS_CONFIG.put("auto.offset.reset", "earliest");
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        STREAMS_CONFIG.put("default.key.serde", Serdes.Long().getClass());
        STREAMS_CONFIG.put("default.value.serde", Serdes.String().getClass());
        STREAMS_CONFIG.put("commit.interval.ms", COMMIT_INTERVAL);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void prepareTopology() throws InterruptedException {
        CLUSTER.createTopics(INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
        STREAMS_CONFIG.put("state.dir", TestUtils.tempDirectory().getPath());
        this.builder = new StreamsBuilder();
        this.rightTable = this.builder.table(INPUT_TOPIC_RIGHT);
    }

    @AfterEach
    public void cleanup() throws InterruptedException, IOException {
        CLUSTER.deleteAllTopicsAndWait(120000L);
        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
    }

    @Test
    public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws InterruptedException {
        STREAMS_CONFIG.put("application.id", APP_ID);
        STREAMS_CONFIG.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT).leftJoin(this.rightTable, this.valueJoiner).groupBy((l, str) -> {
            return l;
        }).reduce((str2, str3) -> {
            return str2 + str3;
        }).toStream().to(OUTPUT_TOPIC);
        KafkaStreamsWrapper kafkaStreamsWrapper = new KafkaStreamsWrapper(this.builder.build(), STREAMS_CONFIG);
        IntegrationTestUtils.StateListenerStub stateListenerStub = new IntegrationTestUtils.StateListenerStub();
        kafkaStreamsWrapper.setStreamThreadStateListener(stateListenerStub);
        kafkaStreamsWrapper.start();
        Objects.requireNonNull(stateListenerStub);
        TestUtils.waitForCondition(stateListenerStub::transitToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN");
        kafkaStreamsWrapper.close();
        Assertions.assertTrue(stateListenerStub.transitToPendingShutdownSeen());
    }
}
