package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/RegexSourceIntegrationTest.class */
public class RegexSourceIntegrationTest {
    private final MockTime mockTime = CLUSTER.time;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
    private KafkaStreams streams;
    private String outputTopic;
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private static volatile AtomicInteger topicSuffixGenerator = new AtomicInteger(0);

    /* loaded from: input_file:org/apache/kafka/streams/integration/RegexSourceIntegrationTest$TheConsumerRebalanceListener.class */
    private static class TheConsumerRebalanceListener implements ConsumerRebalanceListener {
        private final List<String> assignedTopics;
        private final ConsumerRebalanceListener listener;

        TheConsumerRebalanceListener(List<String> list, ConsumerRebalanceListener consumerRebalanceListener) {
            this.assignedTopics = list;
            this.listener = consumerRebalanceListener;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.assignedTopics.remove(it.next().topic());
            }
            this.listener.onPartitionsRevoked(collection);
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                this.assignedTopics.add(it.next().topic());
            }
            Collections.sort(this.assignedTopics);
            this.listener.onPartitionsAssigned(collection);
        }
    }

    @BeforeAll
    public static void startCluster() throws IOException, InterruptedException {
        CLUSTER.start();
        CLUSTER.createTopics(TOPIC_1, TOPIC_2, TOPIC_A, TOPIC_C, TOPIC_Y, TOPIC_Z, FA_TOPIC, FOO_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, NUM_BROKERS);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, NUM_BROKERS);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setUp(TestInfo testInfo) throws InterruptedException {
        this.outputTopic = createTopic(topicSuffixGenerator.incrementAndGet());
        Properties properties = new Properties();
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("commit.interval.ms", 100L);
        properties.put("metadata.max.age.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.task.idle.ms", 0L);
        properties.put("session.timeout.ms", 10000);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(IntegrationTestUtils.safeUniqueTestName(testInfo), CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
    }

    @AfterEach
    public void tearDown() throws IOException {
        if (this.streams != null) {
            this.streams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        try {
            Serde String = Serdes.String();
            List singletonList = Collections.singletonList("TEST-TOPIC-1");
            List asList = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
            CLUSTER.createTopic("TEST-TOPIC-1");
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(Pattern.compile("TEST-TOPIC-\\d")).to(this.outputTopic, Produced.with(String, String));
            final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            this.streams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration, new DefaultKafkaClientSupplier() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.1
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
                    return new KafkaConsumer<byte[], byte[]>(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.1.1
                        public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
                            super.subscribe(pattern, new TheConsumerRebalanceListener(copyOnWriteArrayList, consumerRebalanceListener));
                        }
                    };
                }
            });
            this.streams.start();
            TestUtils.waitForCondition(() -> {
                return copyOnWriteArrayList.equals(singletonList);
            }, STREAM_TASKS_NOT_UPDATED);
            CLUSTER.createTopic("TEST-TOPIC-2");
            TestUtils.waitForCondition(() -> {
                return copyOnWriteArrayList.equals(asList);
            }, STREAM_TASKS_NOT_UPDATED);
            this.streams.close();
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
            throw th;
        }
    }

    @Test
    public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopologies() throws Exception {
        try {
            CLUSTER.createTopic("TEST-TOPIC-1");
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            KStream stream = streamsBuilder.stream(Pattern.compile("TEST-TOPIC-\\d"));
            streamsBuilder.stream(Pattern.compile("not-a-match"));
            stream.selectKey((str, str2) -> {
                return str;
            }).groupByKey().aggregate(() -> {
                return "";
            }, (str3, str4, str5) -> {
                return str4;
            }).toStream().to(this.outputTopic, Produced.with(Serdes.String(), Serdes.String()));
            Topology build = streamsBuilder.build();
            MatcherAssert.assertThat(Integer.valueOf(build.describe().subtopologies().size()), Matchers.greaterThan(Integer.valueOf(NUM_BROKERS)));
            this.streams = new KafkaStreams(build, this.streamsConfiguration);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(this.streams);
            CLUSTER.createTopic("TEST-TOPIC-2");
            KeyValue keyValue = new KeyValue("1", "1");
            KeyValue keyValue2 = new KeyValue("2", "2");
            IntegrationTestUtils.produceKeyValuesSynchronously("TEST-TOPIC-1", Collections.singletonList(keyValue), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), CLUSTER.time);
            IntegrationTestUtils.produceKeyValuesSynchronously("TEST-TOPIC-2", Collections.singletonList(keyValue2), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), CLUSTER.time);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), this.outputTopic, Arrays.asList(keyValue, keyValue2));
            this.streams.close();
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
            throw th;
        }
    }

    private String createTopic(int i) throws InterruptedException {
        String str = "outputTopic_" + i;
        CLUSTER.createTopic(str);
        return str;
    }

    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde String = Serdes.String();
        List asList = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        List singletonList = Collections.singletonList("TEST-TOPIC-B");
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        try {
            CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            streamsBuilder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")).to(this.outputTopic, Produced.with(String, String));
            this.streams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration, new DefaultKafkaClientSupplier() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.2
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
                    return new KafkaConsumer<byte[], byte[]>(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.2.1
                        public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
                            super.subscribe(pattern, new TheConsumerRebalanceListener(copyOnWriteArrayList, consumerRebalanceListener));
                        }
                    };
                }
            });
            this.streams.start();
            TestUtils.waitForCondition(() -> {
                return copyOnWriteArrayList.equals(asList);
            }, STREAM_TASKS_NOT_UPDATED);
            CLUSTER.deleteTopicAndWait("TEST-TOPIC-A");
            TestUtils.waitForCondition(() -> {
                return copyOnWriteArrayList.equals(singletonList);
            }, STREAM_TASKS_NOT_UPDATED);
        } catch (Throwable th) {
            CLUSTER.deleteTopicAndWait("TEST-TOPIC-A");
            throw th;
        }
    }

    @Test
    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
        StoreBuilder mockKeyValueStoreBuilder = new MockKeyValueStoreBuilder("testStateStore", false);
        TopologyWrapper topologyWrapper = new TopologyWrapper();
        topologyWrapper.addSource("ingest", Pattern.compile("topic-\\d+"));
        topologyWrapper.addProcessor("my-processor", new MockApiProcessorSupplier(), new String[]{"ingest"});
        topologyWrapper.addStateStore(mockKeyValueStoreBuilder, new String[]{"my-processor"});
        this.streams = new KafkaStreams(topologyWrapper, this.streamsConfiguration);
        this.streams.start();
        TestUtils.waitForCondition(() -> {
            List list = (List) topologyWrapper.getInternalBuilder().stateStoreNameToFullSourceTopicNames().get("testStateStore");
            return (list == null || list.isEmpty() || !((String) list.get(0)).equals(TOPIC_1)) ? false : true;
        }, 30000L, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        Serde String = Serdes.String();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Pattern.compile("topic-\\d"));
        KStream stream2 = streamsBuilder.stream(Pattern.compile("topic-[A-D]"));
        KStream stream3 = streamsBuilder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
        stream.to(this.outputTopic, Produced.with(String, String));
        stream2.to(this.outputTopic, Produced.with(String, String));
        stream3.to(this.outputTopic, Produced.with(String, String));
        this.streams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singleton("topic-1 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singleton("topic-2 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singleton("topic-A test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singleton("topic-C test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singleton("topic-Y test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singleton("topic-Z test"), producerConfig, this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List asList = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, this.outputTopic, 6);
        ArrayList arrayList = new ArrayList(6);
        Iterator it = waitUntilMinKeyValueRecordsReceived.iterator();
        while (it.hasNext()) {
            arrayList.add((String) ((KeyValue) it.next()).value);
        }
        Collections.sort(arrayList);
        Collections.sort(asList);
        MatcherAssert.assertThat(arrayList, CoreMatchers.equalTo(asList));
    }

    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        KafkaStreams kafkaStreams = null;
        KafkaStreams kafkaStreams2 = null;
        try {
            Serde String = Serdes.String();
            StreamsBuilder streamsBuilder = new StreamsBuilder();
            StreamsBuilder streamsBuilder2 = new StreamsBuilder();
            List asList = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
            KStream stream = streamsBuilder.stream(Pattern.compile("partitioned-\\d"));
            KStream stream2 = streamsBuilder2.stream(Pattern.compile("partitioned-\\d"));
            stream.to(this.outputTopic, Produced.with(String, String));
            stream2.to(this.outputTopic, Produced.with(String, String));
            final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
            final CopyOnWriteArrayList copyOnWriteArrayList2 = new CopyOnWriteArrayList();
            kafkaStreams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration, new DefaultKafkaClientSupplier() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.3
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
                    return new KafkaConsumer<byte[], byte[]>(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.3.1
                        public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
                            super.subscribe(pattern, new TheConsumerRebalanceListener(copyOnWriteArrayList, consumerRebalanceListener));
                        }
                    };
                }
            });
            kafkaStreams2 = new KafkaStreams(streamsBuilder2.build(), this.streamsConfiguration, new DefaultKafkaClientSupplier() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.4
                public Consumer<byte[], byte[]> getConsumer(Map<String, Object> map) {
                    return new KafkaConsumer<byte[], byte[]>(map, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.4.1
                        public void subscribe(Pattern pattern, ConsumerRebalanceListener consumerRebalanceListener) {
                            super.subscribe(pattern, new TheConsumerRebalanceListener(copyOnWriteArrayList2, consumerRebalanceListener));
                        }
                    };
                }
            });
            kafkaStreams.start();
            kafkaStreams2.start();
            TestUtils.waitForCondition(() -> {
                return copyOnWriteArrayList2.equals(asList) && copyOnWriteArrayList.equals(asList);
            }, "topic assignment not completed");
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
            if (kafkaStreams2 != null) {
                kafkaStreams2.close();
            }
        } catch (Throwable th) {
            if (kafkaStreams != null) {
                kafkaStreams.close();
            }
            if (kafkaStreams2 != null) {
                kafkaStreams2.close();
            }
            throw th;
        }
    }

    @Test
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        Serde String = Serdes.String();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream = streamsBuilder.stream(Pattern.compile("foo.*"));
        KStream stream2 = streamsBuilder.stream(Pattern.compile("f.*"));
        stream.to(this.outputTopic, Produced.with(String, String));
        stream2.to(this.outputTopic, Produced.with(String, String));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.streams = new KafkaStreams(streamsBuilder.build(), this.streamsConfiguration);
        this.streams.setStateListener((state, state2) -> {
            if (state == KafkaStreams.State.ERROR) {
                atomicBoolean.set(true);
            }
        });
        this.streams.setUncaughtExceptionHandler(th -> {
            atomicBoolean.set(true);
            return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
        });
        this.streams.start();
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Collections.singleton("fMessage"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Collections.singleton("fooMessage"), producerConfig, this.mockTime);
        try {
            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), this.outputTopic, 2, 5000L);
            throw new IllegalStateException("This should not happen: an assertion error should have been thrown before this.");
        } catch (AssertionError e) {
            MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), CoreMatchers.is(true));
        }
    }
}
