package org.apache.kafka.streams.processor.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.CreateTopicsOptions;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.log4j.Level;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.class */
public class InternalTopicManagerTest {
    private String threadName;
    private MockAdminClient mockAdminClient;
    private InternalTopicManager internalTopicManager;
    private final Node broker1 = new Node(0, "dummyHost-1", 1234);
    private final Node broker2 = new Node(1, "dummyHost-2", 1234);
    private final List<Node> cluster = new ArrayList<Node>(2) { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.1
        {
            add(InternalTopicManagerTest.this.broker1);
            add(InternalTopicManagerTest.this.broker2);
        }
    };
    private final String topic1 = "test_topic";
    private final String topic2 = "test_topic_2";
    private final String topic3 = "test_topic_3";
    private final String topic4 = "test_topic_4";
    private final String topic5 = "test_topic_5";
    private final List<Node> singleReplica = Collections.singletonList(this.broker1);
    private final MockTime time = new MockTime(0);
    private final Map<String, Object> config = new HashMap<String, Object>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.2
        {
            put("application.id", "app-id");
            put("bootstrap.servers", InternalTopicManagerTest.this.broker1.host() + ":" + InternalTopicManagerTest.this.broker1.port());
            put("replication.factor", 1);
            put(StreamsConfig.producerPrefix("batch.size"), 16384);
            put(StreamsConfig.consumerPrefix("max.poll.interval.ms"), 100);
            put("retry.backoff.ms", 10);
        }
    };

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$AutoAdvanceMockTime.class */
    private static class AutoAdvanceMockTime extends MockTime {
        private final MockTime time;

        private AutoAdvanceMockTime(MockTime mockTime) {
            this.time = mockTime;
        }

        public long milliseconds() {
            long milliseconds = this.time.milliseconds();
            this.time.sleep(10L);
            return milliseconds;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockCreateTopicsResult.class */
    private static class MockCreateTopicsResult extends CreateTopicsResult {
        MockCreateTopicsResult(Map<String, KafkaFuture<CreateTopicsResult.TopicMetadataAndConfig>> map) {
            super(map);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockDeleteTopicsResult.class */
    private static class MockDeleteTopicsResult extends DeleteTopicsResult {
        MockDeleteTopicsResult(Map<String, KafkaFuture<Void>> map) {
            super((Map) null, map);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockDescribeConfigsResult.class */
    public static class MockDescribeConfigsResult extends DescribeConfigsResult {
        MockDescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> map) {
            super(map);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/InternalTopicManagerTest$MockDescribeTopicsResult.class */
    public static class MockDescribeTopicsResult extends DescribeTopicsResult {
        MockDescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> map) {
            super((Map) null, map);
        }
    }

    @BeforeEach
    public void init() {
        this.threadName = Thread.currentThread().getName();
        this.mockAdminClient = new MockAdminClient(this.cluster, this.broker1);
        this.internalTopicManager = new InternalTopicManager(this.time, this.mockAdminClient, new StreamsConfig(this.config));
    }

    @AfterEach
    public void shutdown() {
        this.mockAdminClient.close();
    }

    @Test
    public void shouldCreateTopics() throws Exception {
        this.internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 1)), Utils.mkEntry("test_topic_2", setupRepartitionTopicConfig("test_topic_2", 1))}));
        Set set = (Set) this.mockAdminClient.listTopics().names().get();
        MatcherAssert.assertThat(Integer.valueOf(set.size()), Matchers.is(2));
        MatcherAssert.assertThat(set, Matchers.hasItem("test_topic"));
        MatcherAssert.assertThat(set, Matchers.hasItem("test_topic_2"));
    }

    @Test
    public void shouldNotCreateTopicsWithEmptyInput() throws Exception {
        this.internalTopicManager.setup(Collections.emptyMap());
        MatcherAssert.assertThat((Set) this.mockAdminClient.listTopics().names().get(), Matchers.empty());
    }

    @Test
    public void shouldOnlyRetryNotSuccessfulFuturesDuringSetup() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TopicExistsException("exists"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        NewTopic newTopic = newTopic("test_topic", internalTopicConfig, streamsConfig);
        NewTopic newTopic2 = newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic, newTopic2}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2), Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic2}))).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl2)}));
        });
        internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
    }

    @Test
    public void shouldRetryCreateTopicWhenCreationTimesOut() {
        shouldRetryCreateTopicWhenRetriableExceptionIsThrown(new TimeoutException("timed out"));
    }

    @Test
    public void shouldRetryCreateTopicWhenTopicNotYetDeleted() {
        shouldRetryCreateTopicWhenRetriableExceptionIsThrown(new TopicExistsException("exists"));
    }

    private void shouldRetryCreateTopicWhenRetriableExceptionIsThrown(Exception exc) {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        new KafkaFutureImpl().completeExceptionally(exc);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic("test_topic", internalTopicConfig, streamsConfig)}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        }).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig)}));
    }

    @Test
    public void shouldThrowInformativeExceptionForOlderBrokers() {
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, new MockAdminClient() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.3
            public CreateTopicsResult createTopics(Collection<NewTopic> collection, CreateTopicsOptions createTopicsOptions) {
                CreateTopicsRequestData.CreatableTopic creatableTopic = new CreateTopicsRequestData.CreatableTopic();
                creatableTopic.setAssignments(new CreateTopicsRequestData.CreatableReplicaAssignmentCollection());
                creatableTopic.setNumPartitions(1);
                creatableTopic.setReplicationFactor((short) -1);
                CreateTopicsRequestData.CreatableTopicCollection creatableTopicCollection = new CreateTopicsRequestData.CreatableTopicCollection();
                creatableTopicCollection.add(creatableTopic);
                try {
                    new CreateTopicsRequest.Builder(new CreateTopicsRequestData().setTopics(creatableTopicCollection).setTimeoutMs(0).setValidateOnly(createTopicsOptions.shouldValidateOnly())).build((short) 3);
                    throw new IllegalStateException("Building CreateTopicRequest should have thrown.");
                } catch (UnsupportedVersionException e) {
                    KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
                    kafkaFutureImpl.completeExceptionally(e);
                    return new CreateTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl)) { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.3.1
                    };
                }
            }
        }, new StreamsConfig(this.config));
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        MatcherAssert.assertThat(Assertions.assertThrows(StreamsException.class, () -> {
            internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        }).getMessage(), Matchers.equalTo("Could not create topic test_topic, because brokers don't support configuration replication.factor=-1. You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
    }

    @Test
    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
        setupTopicInMockAdminClient("test_topic", Collections.emptyMap());
        MockTime mockTime = new MockTime(5L);
        this.mockAdminClient.timeoutNextRequest(Integer.MAX_VALUE);
        InternalTopicManager internalTopicManager = new InternalTopicManager(mockTime, this.mockAdminClient, new StreamsConfig(this.config));
        MatcherAssert.assertThat(Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.getTopicPartitionInfo(Collections.singleton("test_topic"));
        }).getMessage(), Matchers.is("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }

    @Test
    public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
        setupTopicInMockAdminClient("test_topic", Collections.emptyMap());
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 15), this.mockAdminClient, new StreamsConfig(this.config));
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        MatcherAssert.assertThat(Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig));
        }).getMessage(), Matchers.is("Could not create internal topics within " + (((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 2) + " milliseconds. This can happen if the Kafka cluster is temporarily not available or a topic is marked for deletion and the broker did not complete its deletion within the timeout. The last errors seen per topic are: {test_topic=org.apache.kafka.common.errors.TopicExistsException: Topic test_topic exists already.}"));
    }

    @Test
    public void shouldThrowTimeoutExceptionIfGetPartitionInfoHasTopicDescriptionTimeout() {
        this.mockAdminClient.timeoutNextRequest(1);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, this.mockAdminClient, new StreamsConfig(this.config));
        try {
            internalTopicManager.getTopicPartitionInfo(new HashSet(Collections.singletonList("test_topic")), (Set) null);
        } catch (TimeoutException e) {
            Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
        }
        this.mockAdminClient.timeoutNextRequest(1);
        try {
            internalTopicManager.getTopicPartitionInfo(new HashSet(Collections.singletonList("test_topic_2")), (Set) null);
        } catch (TimeoutException e2) {
            Assertions.assertEquals(TimeoutException.class, e2.getCause().getClass());
        }
    }

    @Test
    public void shouldThrowTimeoutExceptionIfGetNumPartitionsHasTopicDescriptionTimeout() {
        this.mockAdminClient.timeoutNextRequest(1);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, this.mockAdminClient, new StreamsConfig(this.config));
        try {
            internalTopicManager.getNumPartitions(new HashSet(Arrays.asList("test_topic")), new HashSet(Arrays.asList("test_topic_2")));
        } catch (TimeoutException e) {
            Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
        }
        this.mockAdminClient.timeoutNextRequest(1);
        try {
            internalTopicManager.getNumPartitions(new HashSet(Arrays.asList("test_topic")), new HashSet(Arrays.asList("test_topic_2")));
        } catch (TimeoutException e2) {
            Assertions.assertEquals(TimeoutException.class, e2.getCause().getClass());
        }
    }

    @Test
    public void shouldThrowWhenCreateTopicsThrowsUnexpectedException() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic("test_topic", internalTopicConfig, streamsConfig)}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(StreamsException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig)}));
        });
    }

    @Test
    public void shouldThrowWhenCreateTopicsResultsDoNotContainTopic() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic("test_topic", internalTopicConfig, streamsConfig)}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", new KafkaFutureImpl()));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            internalTopicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenCreateTopicExceedsTimeout() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        MockTime mockTime = new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(mockTime, adminClient, streamsConfig);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException());
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic("test_topic", internalTopicConfig, streamsConfig)}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringSetup() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        MockTime mockTime = new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(mockTime, adminClient, streamsConfig);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic("test_topic", internalTopicConfig, streamsConfig)}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.setup(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldCleanUpWhenUnexpectedExceptionIsThrownDuringSetup() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3), adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        setupCleanUpScenario(adminClient, streamsConfig, internalTopicConfig, internalTopicConfig2);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete((Object) null);
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(StreamsException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    @Test
    public void shouldCleanUpWhenCreateTopicsResultsDoNotContainTopic() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TopicExistsException("exists"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic = newTopic("test_topic", internalTopicConfig, streamsConfig);
        NewTopic newTopic2 = newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic, newTopic2}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2), Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic2}))).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_3", kafkaFutureImpl2)}));
        });
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete((Object) null);
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock3 -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl3)}));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    @Test
    public void shouldCleanUpWhenCreateTopicsTimesOut() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3), adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TopicExistsException("exists"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic = newTopic("test_topic", internalTopicConfig, streamsConfig);
        NewTopic newTopic2 = newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic, newTopic2}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2), Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic2}))).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl3)}));
        });
        KafkaFutureImpl kafkaFutureImpl4 = new KafkaFutureImpl();
        kafkaFutureImpl4.complete((Object) null);
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock3 -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl4)}));
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    @Test
    public void shouldRetryDeleteTopicWhenTopicUnknown() {
        shouldRetryDeleteTopicWhenRetriableException(new UnknownTopicOrPartitionException());
    }

    @Test
    public void shouldRetryDeleteTopicWhenLeaderNotAvailable() {
        shouldRetryDeleteTopicWhenRetriableException(new LeaderNotAvailableException("leader not available"));
    }

    @Test
    public void shouldRetryDeleteTopicWhenFutureTimesOut() {
        shouldRetryDeleteTopicWhenRetriableException(new TimeoutException("timed out"));
    }

    private void shouldRetryDeleteTopicWhenRetriableException(Exception exc) {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        setupCleanUpScenario(adminClient, streamsConfig, internalTopicConfig, internalTopicConfig2);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(exc);
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete((Object) null);
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        }).thenAnswer(invocationOnMock2 -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2)}));
        });
        Assertions.assertThrows(StreamsException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringCleanUp() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3), adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        setupCleanUpScenario(adminClient, streamsConfig, internalTopicConfig, internalTopicConfig2);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    @Test
    public void shouldThrowWhenDeleteTopicsThrowsUnexpectedException() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        StreamsConfig streamsConfig = new StreamsConfig(this.config);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, streamsConfig);
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        InternalTopicConfig internalTopicConfig2 = setupRepartitionTopicConfig("test_topic_2", 1);
        setupCleanUpScenario(adminClient, streamsConfig, internalTopicConfig, internalTopicConfig2);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));
        Mockito.when(adminClient.deleteTopics(Utils.mkSet(new String[]{"test_topic"}))).thenAnswer(invocationOnMock -> {
            return new MockDeleteTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        Assertions.assertThrows(StreamsException.class, () -> {
            internalTopicManager.setup(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", internalTopicConfig), Utils.mkEntry("test_topic_2", internalTopicConfig2)}));
        });
    }

    private void setupCleanUpScenario(AdminClient adminClient, StreamsConfig streamsConfig, InternalTopicConfig internalTopicConfig, InternalTopicConfig internalTopicConfig2) {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TopicExistsException("exists"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete(new CreateTopicsResult.TopicMetadataAndConfig(Uuid.randomUuid(), 1, 1, new Config(Collections.emptyList())));
        NewTopic newTopic = newTopic("test_topic", internalTopicConfig, streamsConfig);
        NewTopic newTopic2 = newTopic("test_topic_2", internalTopicConfig2, streamsConfig);
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic, newTopic2}))).thenAnswer(invocationOnMock -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl3), Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        Mockito.when(adminClient.createTopics(Utils.mkSet(new NewTopic[]{newTopic2}))).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl2)}));
        });
    }

    @Test
    public void shouldReturnCorrectPartitionCounts() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.singleReplica, Collections.emptyList())), (Map) null);
        Assertions.assertEquals(Collections.singletonMap("test_topic", 1), this.internalTopicManager.getNumPartitions(Collections.singleton("test_topic"), Collections.emptySet()));
    }

    @Test
    public void shouldReturnCorrectPartitionInfo() {
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, this.broker1, this.singleReplica, Collections.emptyList());
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(topicPartitionInfo), (Map) null);
        Assertions.assertEquals(Collections.singletonMap("test_topic", Collections.singletonList(topicPartitionInfo)), this.internalTopicManager.getTopicPartitionInfo(Collections.singleton("test_topic")));
    }

    @Test
    public void shouldCreateRequiredTopics() throws Exception {
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        UnwindowedUnversionedChangelogTopicConfig unwindowedUnversionedChangelogTopicConfig = new UnwindowedUnversionedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        unwindowedUnversionedChangelogTopicConfig.setNumberOfPartitions(1);
        WindowedChangelogTopicConfig windowedChangelogTopicConfig = new WindowedChangelogTopicConfig("test_topic_3", Collections.emptyMap(), 10L);
        windowedChangelogTopicConfig.setNumberOfPartitions(1);
        VersionedChangelogTopicConfig versionedChangelogTopicConfig = new VersionedChangelogTopicConfig("test_topic_4", Collections.emptyMap(), 12L);
        versionedChangelogTopicConfig.setNumberOfPartitions(1);
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_2", unwindowedUnversionedChangelogTopicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_3", windowedChangelogTopicConfig));
        this.internalTopicManager.makeReady(Collections.singletonMap("test_topic_4", versionedChangelogTopicConfig));
        Assertions.assertEquals(Utils.mkSet(new String[]{"test_topic", "test_topic_2", "test_topic_3", "test_topic_4"}), this.mockAdminClient.listTopics().names().get());
        Assertions.assertEquals(new TopicDescription("test_topic", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.4
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic")).topicNameValues().get("test_topic")).get());
        Assertions.assertEquals(new TopicDescription("test_topic_2", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.5
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic_2")).topicNameValues().get("test_topic_2")).get());
        Assertions.assertEquals(new TopicDescription("test_topic_3", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.6
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic_3")).topicNameValues().get("test_topic_3")).get());
        Assertions.assertEquals(new TopicDescription("test_topic_4", false, new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.7
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
            }
        }), ((KafkaFuture) this.mockAdminClient.describeTopics(Collections.singleton("test_topic_4")).topicNameValues().get("test_topic_4")).get());
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        ConfigResource configResource3 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_3");
        ConfigResource configResource4 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_4");
        Assertions.assertEquals(new ConfigEntry("cleanup.policy", "delete"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource)).values().get(configResource)).get()).get("cleanup.policy"));
        Assertions.assertEquals(new ConfigEntry("cleanup.policy", "compact"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource2)).values().get(configResource2)).get()).get("cleanup.policy"));
        Assertions.assertEquals(new ConfigEntry("cleanup.policy", "compact,delete"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource3)).values().get(configResource3)).get()).get("cleanup.policy"));
        Assertions.assertEquals(new ConfigEntry("cleanup.policy", "compact"), ((Config) ((KafkaFuture) this.mockAdminClient.describeConfigs(Collections.singleton(configResource4)).values().get(configResource4)).get()).get("cleanup.policy"));
    }

    @Test
    public void shouldCompleteTopicValidationOnRetry() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(topicPartitionInfo), Collections.emptySet()));
        kafkaFutureImpl2.completeExceptionally(new UnknownTopicOrPartitionException("KABOOM!"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.completeExceptionally(new TopicExistsException("KABOOM!"));
        Mockito.when(adminClient.describeTopics(Utils.mkSet(new String[]{"test_topic", "test_topic_2"}))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl), Utils.mkEntry("test_topic_2", kafkaFutureImpl2)}));
        });
        Mockito.when(adminClient.createTopics(Collections.singleton(new NewTopic("test_topic_2", Optional.of(1), Optional.of((short) 1)).configs(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "compact"), Utils.mkEntry("message.timestamp.type", "CreateTime")}))))).thenAnswer(invocationOnMock2 -> {
            return new MockCreateTopicsResult(Collections.singletonMap("test_topic_2", kafkaFutureImpl3));
        });
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic_2"))).thenAnswer(invocationOnMock3 -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic_2", kafkaFutureImpl));
        });
        UnwindowedUnversionedChangelogTopicConfig unwindowedUnversionedChangelogTopicConfig = new UnwindowedUnversionedChangelogTopicConfig("test_topic", Collections.emptyMap());
        unwindowedUnversionedChangelogTopicConfig.setNumberOfPartitions(1);
        UnwindowedUnversionedChangelogTopicConfig unwindowedUnversionedChangelogTopicConfig2 = new UnwindowedUnversionedChangelogTopicConfig("test_topic_2", Collections.emptyMap());
        unwindowedUnversionedChangelogTopicConfig2.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", unwindowedUnversionedChangelogTopicConfig), Utils.mkEntry("test_topic_2", unwindowedUnversionedChangelogTopicConfig2)}));
    }

    @Test
    public void shouldNotCreateTopicIfExistsWithDifferentPartitions() {
        this.mockAdminClient.addTopic(false, "test_topic", new ArrayList<TopicPartitionInfo>() { // from class: org.apache.kafka.streams.processor.internals.InternalTopicManagerTest.8
            {
                add(new TopicPartitionInfo(0, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
                add(new TopicPartitionInfo(1, InternalTopicManagerTest.this.broker1, InternalTopicManagerTest.this.singleReplica, Collections.emptyList()));
            }
        }, (Map) null);
        try {
            RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
            repartitionTopicConfig.setNumberOfPartitions(1);
            this.internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
            Assertions.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotThrowExceptionIfExistsWithDifferentReplication() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
    }

    @Test
    public void shouldNotThrowExceptionForEmptyTopicMap() {
        this.internalTopicManager.makeReady(Collections.emptyMap());
    }

    @Test
    public void shouldExhaustRetriesOnTimeoutExceptionForMakeReady() {
        this.mockAdminClient.timeoutNextRequest(5);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new AutoAdvanceMockTime(this.time), this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        try {
            internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
            Assertions.fail("Should have thrown TimeoutException.");
        } catch (TimeoutException e) {
            MatcherAssert.assertThat(e.getMessage(), Matchers.is("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
        }
    }

    @Test
    public void shouldLogWhenTopicNotFoundAndNotThrowException() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        RepartitionTopicConfig repartitionTopicConfig2 = new RepartitionTopicConfig("internal-topic", Collections.emptyMap());
        repartitionTopicConfig2.setNumberOfPartitions(1);
        HashMap hashMap = new HashMap();
        hashMap.put("test_topic", repartitionTopicConfig);
        hashMap.put("internal-topic", repartitionTopicConfig2);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(InternalTopicManager.class);
        try {
            createAndRegister.setClassLogger(InternalTopicManager.class, Level.DEBUG);
            this.internalTopicManager.makeReady(hashMap);
            MatcherAssert.assertThat(createAndRegister.getMessages(), Matchers.hasItem("stream-thread [" + this.threadName + "] Topic internal-topic is unknown or not found, hence not existed yet.\nError message was: org.apache.kafka.common.errors.UnknownTopicOrPartitionException: Topic internal-topic not found."));
            if (createAndRegister != null) {
                createAndRegister.close();
            }
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void shouldCreateTopicWhenTopicLeaderNotAvailableAndThenTopicNotFound() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.completeExceptionally(new UnknownTopicOrPartitionException("Unknown Topic!"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete((CreateTopicsResult.TopicMetadataAndConfig) Mockito.mock(CreateTopicsResult.TopicMetadataAndConfig.class));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl));
        });
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl2));
        });
        Mockito.when(adminClient.createTopics(Collections.singleton(new NewTopic("test_topic", Optional.of(1), Optional.of((short) 1)).configs(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "delete"), Utils.mkEntry("message.timestamp.type", "CreateTime"), Utils.mkEntry("segment.bytes", "52428800"), Utils.mkEntry("retention.ms", "-1")}))))).thenAnswer(invocationOnMock3 -> {
            return new MockCreateTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl3));
        });
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
    }

    @Test
    public void shouldCompleteValidateWhenTopicLeaderNotAvailableAndThenDescribeSuccess() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, this.broker1, Collections.singletonList(this.broker1), Collections.singletonList(this.broker1));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new TopicDescription("test_topic", false, Collections.singletonList(topicPartitionInfo), Collections.emptySet()));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl));
        });
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl2));
        });
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
    }

    @Test
    public void shouldThrowExceptionWhenKeepsTopicLeaderNotAvailable() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 15), adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Collections.singletonMap("test_topic", kafkaFutureImpl));
        });
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        TimeoutException assertThrows = Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        });
        Assertions.assertNull(assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }

    @Test
    public void shouldExhaustRetriesOnMarkedForDeletionTopic() {
        this.mockAdminClient.addTopic(false, "test_topic", Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), (Map) null);
        this.mockAdminClient.markTopicForDeletion("test_topic");
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 15), this.mockAdminClient, new StreamsConfig(this.config));
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(1);
        TimeoutException assertThrows = Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.makeReady(Collections.singletonMap("test_topic", repartitionTopicConfig));
        });
        Assertions.assertNull(assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.equalTo("Could not create topics within 50 milliseconds. This can happen if the Kafka cluster is temporarily not available."));
    }

    @Test
    public void shouldValidateSuccessfully() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        setupTopicInMockAdminClient("test_topic_2", repartitionTopicConfig());
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 1)), Utils.mkEntry("test_topic_2", setupRepartitionTopicConfig("test_topic_2", 1))}));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldValidateSuccessfullyWithEmptyInternalTopics() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Collections.emptyMap());
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldReportMissingTopics() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 1)), Utils.mkEntry("missingTopic1", setupRepartitionTopicConfig("missingTopic1", 1)), Utils.mkEntry("missingTopic2", setupRepartitionTopicConfig("missingTopic2", 1))}));
        Set missingTopics = validate.missingTopics();
        MatcherAssert.assertThat(Integer.valueOf(missingTopics.size()), Matchers.is(2));
        MatcherAssert.assertThat(missingTopics, Matchers.hasItem("missingTopic1"));
        MatcherAssert.assertThat(missingTopics, Matchers.hasItem("missingTopic2"));
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldReportMisconfigurationsOfPartitionCount() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        setupTopicInMockAdminClient("test_topic_2", repartitionTopicConfig());
        setupTopicInMockAdminClient("test_topic_3", repartitionTopicConfig());
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 2)), Utils.mkEntry("test_topic_2", setupRepartitionTopicConfig("test_topic_2", 3)), Utils.mkEntry("test_topic_3", setupRepartitionTopicConfig("test_topic_3", 1))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(2));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic")).get(0), Matchers.is("Internal topic test_topic requires 2 partitions, but the existing topic on the broker has 1 partitions."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_2"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_2")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_2")).get(0), Matchers.is("Internal topic test_topic_2 requires 3 partitions, but the existing topic on the broker has 1 partitions."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.not(Matchers.hasKey("test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForUnwindowedUnversionedChangelogTopics() {
        Map<String, String> unwindowedUnversionedChangelogConfig = unwindowedUnversionedChangelogConfig();
        unwindowedUnversionedChangelogConfig.put("cleanup.policy", "delete");
        setupTopicInMockAdminClient("test_topic", unwindowedUnversionedChangelogConfig);
        Map<String, String> unwindowedUnversionedChangelogConfig2 = unwindowedUnversionedChangelogConfig();
        unwindowedUnversionedChangelogConfig2.put("cleanup.policy", "compact,delete");
        setupTopicInMockAdminClient("test_topic_2", unwindowedUnversionedChangelogConfig2);
        setupTopicInMockAdminClient("test_topic_3", unwindowedUnversionedChangelogConfig());
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupUnwindowedUnversionedChangelogTopicConfig("test_topic", 1)), Utils.mkEntry("test_topic_2", setupUnwindowedUnversionedChangelogTopicConfig("test_topic_2", 1)), Utils.mkEntry("test_topic_3", setupUnwindowedUnversionedChangelogTopicConfig("test_topic_3", 1))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(2));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic should not contain \"delete\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_2"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_2")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_2")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic_2 should not contain \"delete\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.not(Matchers.hasKey("test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForWindowedChangelogTopics() {
        setupTopicInMockAdminClient("test_topic", windowedChangelogConfig(1000L));
        setupTopicInMockAdminClient("test_topic_2", windowedChangelogConfig(900L));
        Map<String, String> windowedChangelogConfig = windowedChangelogConfig(1000L);
        windowedChangelogConfig.put("cleanup.policy", "compact");
        setupTopicInMockAdminClient("test_topic_3", windowedChangelogConfig);
        Map<String, String> windowedChangelogConfig2 = windowedChangelogConfig(900L);
        windowedChangelogConfig2.put("cleanup.policy", "delete");
        setupTopicInMockAdminClient("test_topic_4", windowedChangelogConfig2);
        Map<String, String> windowedChangelogConfig3 = windowedChangelogConfig(1000L);
        windowedChangelogConfig3.put("retention.bytes", "1024");
        setupTopicInMockAdminClient("test_topic_5", windowedChangelogConfig3);
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupWindowedChangelogTopicConfig("test_topic", 1, 1000L)), Utils.mkEntry("test_topic_2", setupWindowedChangelogTopicConfig("test_topic_2", 1, 1000L)), Utils.mkEntry("test_topic_3", setupWindowedChangelogTopicConfig("test_topic_3", 1, 1000L)), Utils.mkEntry("test_topic_4", setupWindowedChangelogTopicConfig("test_topic_4", 1, 1000L)), Utils.mkEntry("test_topic_5", setupWindowedChangelogTopicConfig("test_topic_5", 1, 1000L))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(3));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_2"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_2")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_2")).get(0), Matchers.is("Retention time (retention.ms) of existing internal topic test_topic_2 is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_4"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_4")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_4")).get(0), Matchers.is("Retention time (retention.ms) of existing internal topic test_topic_4 is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_5"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_5")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_5")).get(0), Matchers.is("Retention byte (retention.bytes) of existing internal topic test_topic_5 is set but it should be unset."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.not(Matchers.hasKey("test_topic")));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.not(Matchers.hasKey("test_topic_3")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForVersionedChangelogTopics() {
        setupTopicInMockAdminClient("test_topic", versionedChangelogConfig(1000L));
        setupTopicInMockAdminClient("test_topic_2", versionedChangelogConfig(900L));
        Map<String, String> versionedChangelogConfig = versionedChangelogConfig(1000L);
        versionedChangelogConfig.put("cleanup.policy", "delete");
        setupTopicInMockAdminClient("test_topic_3", versionedChangelogConfig);
        Map<String, String> versionedChangelogConfig2 = versionedChangelogConfig(1000L);
        versionedChangelogConfig2.put("cleanup.policy", "compactdelete");
        setupTopicInMockAdminClient("test_topic_4", versionedChangelogConfig2);
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupVersionedChangelogTopicConfig("test_topic", 1, 1000L)), Utils.mkEntry("test_topic_2", setupVersionedChangelogTopicConfig("test_topic_2", 1, 1000L)), Utils.mkEntry("test_topic_3", setupVersionedChangelogTopicConfig("test_topic_3", 1, 1000L)), Utils.mkEntry("test_topic_4", setupVersionedChangelogTopicConfig("test_topic_4", 1, 1000L))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(3));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_2"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_2")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_2")).get(0), Matchers.is("Min compaction lag (min.compaction.lag.ms) of existing internal topic test_topic_2 is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_3"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_3")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_3")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic_3 should not contain \"delete\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_4"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_4")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_4")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic_4 should not contain \"delete\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.not(Matchers.hasKey("test_topic")));
    }

    @Test
    public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        Map<String, String> repartitionTopicConfig = repartitionTopicConfig();
        repartitionTopicConfig.put("cleanup.policy", "compact");
        setupTopicInMockAdminClient("test_topic_2", repartitionTopicConfig);
        Map<String, String> repartitionTopicConfig2 = repartitionTopicConfig();
        repartitionTopicConfig2.put("cleanup.policy", "compact,delete");
        setupTopicInMockAdminClient("test_topic_3", repartitionTopicConfig2);
        Map<String, String> repartitionTopicConfig3 = repartitionTopicConfig();
        repartitionTopicConfig3.put("retention.ms", String.valueOf(1000L));
        setupTopicInMockAdminClient("test_topic_4", repartitionTopicConfig3);
        Map<String, String> repartitionTopicConfig4 = repartitionTopicConfig();
        repartitionTopicConfig4.put("retention.bytes", "1024");
        setupTopicInMockAdminClient("test_topic_5", repartitionTopicConfig4);
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 1)), Utils.mkEntry("test_topic_2", setupRepartitionTopicConfig("test_topic_2", 1)), Utils.mkEntry("test_topic_3", setupRepartitionTopicConfig("test_topic_3", 1)), Utils.mkEntry("test_topic_4", setupRepartitionTopicConfig("test_topic_4", 1)), Utils.mkEntry("test_topic_5", setupRepartitionTopicConfig("test_topic_5", 1))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(4));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_2"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_2")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_2")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic_2 should not contain \"compact\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_3"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_3")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_3")).get(0), Matchers.is("Cleanup policy (cleanup.policy) of existing internal topic test_topic_3 should not contain \"compact\"."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_4"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_4")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_4")).get(0), Matchers.is("Retention time (retention.ms) of existing internal topic test_topic_4 is 1000 but should be -1."));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic_5"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic_5")).size()), Matchers.is(1));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic_5")).get(0), Matchers.is("Retention byte (retention.bytes) of existing internal topic test_topic_5 is set but it should be unset."));
    }

    @Test
    public void shouldReportMultipleMisconfigurationsForSameTopic() {
        Map<String, String> windowedChangelogConfig = windowedChangelogConfig(900L);
        windowedChangelogConfig.put("retention.bytes", "1024");
        setupTopicInMockAdminClient("test_topic", windowedChangelogConfig);
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupWindowedChangelogTopicConfig("test_topic", 1, 1000L))}));
        Map misconfigurationsForTopics = validate.misconfigurationsForTopics();
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(Integer.valueOf(misconfigurationsForTopics.size()), Matchers.is(1));
        MatcherAssert.assertThat(misconfigurationsForTopics, Matchers.hasKey("test_topic"));
        MatcherAssert.assertThat(Integer.valueOf(((List) misconfigurationsForTopics.get("test_topic")).size()), Matchers.is(2));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic")).get(0), Matchers.is("Retention time (retention.ms) of existing internal topic test_topic is 900 but should be 1000 or larger."));
        MatcherAssert.assertThat((String) ((List) misconfigurationsForTopics.get("test_topic")).get(1), Matchers.is("Retention byte (retention.bytes) of existing internal topic test_topic is set but it should be unset."));
    }

    @Test
    public void shouldThrowWhenPartitionCountUnknown() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig("test_topic", Collections.emptyMap());
        Assertions.assertThrows(IllegalStateException.class, () -> {
            this.internalTopicManager.validate(Collections.singletonMap("test_topic", repartitionTopicConfig));
        });
    }

    @Test
    public void shouldNotThrowExceptionIfTopicExistsWithDifferentReplication() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        InternalTopicManager.ValidationResult validate = new InternalTopicManager(this.time, this.mockAdminClient, new StreamsConfig(this.config)).validate(Collections.singletonMap("test_topic", setupRepartitionTopicConfig("test_topic", 1)));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldRetryWhenCallsThrowTimeoutExceptionDuringValidation() {
        setupTopicInMockAdminClient("test_topic", repartitionTopicConfig());
        this.mockAdminClient.timeoutNextRequest(2);
        InternalTopicManager.ValidationResult validate = this.internalTopicManager.validate(Collections.singletonMap("test_topic", setupRepartitionTopicConfig("test_topic", 1)));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldOnlyRetryDescribeTopicsWhenDescribeTopicsThrowsLeaderNotAvailableExceptionDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        }).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2)}));
        });
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete(new Config((Collection) repartitionTopicConfig().entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toSet())));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock3 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl3)}));
        });
        InternalTopicManager.ValidationResult validate = internalTopicManager.validate(Collections.singletonMap("test_topic", setupRepartitionTopicConfig("test_topic", 1)));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldOnlyRetryDescribeConfigsWhenDescribeConfigsThrowsLeaderNotAvailableExceptionDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete(new Config((Collection) repartitionTopicConfig().entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toSet())));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl2)}));
        }).thenAnswer(invocationOnMock3 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl3)}));
        });
        InternalTopicManager.ValidationResult validate = internalTopicManager.validate(Collections.singletonMap("test_topic", setupRepartitionTopicConfig("test_topic", 1)));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldOnlyRetryNotSuccessfulFuturesDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new LeaderNotAvailableException("Leader Not Available!"));
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        KafkaFutureImpl kafkaFutureImpl3 = new KafkaFutureImpl();
        kafkaFutureImpl3.complete(new TopicDescription("test_topic_2", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Utils.mkSet(new String[]{"test_topic", "test_topic_2"}))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl2), Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        Mockito.when(adminClient.describeTopics(Utils.mkSet(new String[]{"test_topic_2"}))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl3)}));
        });
        KafkaFutureImpl kafkaFutureImpl4 = new KafkaFutureImpl();
        kafkaFutureImpl4.complete(new Config((Collection) repartitionTopicConfig().entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toSet())));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        Mockito.when(adminClient.describeConfigs(Utils.mkSet(new ConfigResource[]{configResource, configResource2}))).thenAnswer(invocationOnMock3 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl4), Utils.mkEntry(configResource2, kafkaFutureImpl4)}));
        });
        InternalTopicManager.ValidationResult validate = internalTopicManager.validate(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", setupRepartitionTopicConfig("test_topic", 1)), Utils.mkEntry("test_topic_2", setupRepartitionTopicConfig("test_topic_2", 1))}));
        MatcherAssert.assertThat(validate.missingTopics(), Matchers.empty());
        MatcherAssert.assertThat(validate.misconfigurationsForTopics(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldThrowWhenDescribeTopicsThrowsUnexpectedExceptionDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(Throwable.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowWhenDescribeConfigsThrowsUnexpectedExceptionDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[0]));
        });
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new IllegalStateException("Nobody expects the Spanish inquisition"));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(Throwable.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowWhenTopicDescriptionsDoNotContainTopicDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic_2", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new Config(Collections.emptySet()));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl2)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainTopicDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new Config(Collections.emptySet()));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        ConfigResource configResource2 = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic_2");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource2, kafkaFutureImpl2)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(IllegalStateException.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotCleanupPolicyForUnwindowedUnversionedConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupUnwindowedUnversionedChangelogTopicConfig("test_topic", 1), configWithoutKey(unwindowedUnversionedChangelogConfig(), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForWindowedConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), configWithoutKey(windowedChangelogConfig(1000L), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForWindowedConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), configWithoutKey(windowedChangelogConfig(1000L), "retention.ms"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForWindowedConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupWindowedChangelogTopicConfig("test_topic", 1, 1000L), configWithoutKey(windowedChangelogConfig(1000L), "retention.bytes"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainCleanupPolicyForRepartitionConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupRepartitionTopicConfig("test_topic", 1), configWithoutKey(repartitionTopicConfig(), "cleanup.policy"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionMsForRepartitionConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupRepartitionTopicConfig("test_topic", 1), configWithoutKey(repartitionTopicConfig(), "retention.ms"));
    }

    @Test
    public void shouldThrowWhenConfigDescriptionsDoNotContainRetentionBytesForRepartitionConfigDuringValidation() {
        shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(setupRepartitionTopicConfig("test_topic", 1), configWithoutKey(repartitionTopicConfig(), "retention.bytes"));
    }

    private Config configWithoutKey(Map<String, String> map, String str) {
        return new Config((Collection) map.entrySet().stream().filter(entry -> {
            return !((String) entry.getKey()).equals(str);
        }).map(entry2 -> {
            return new ConfigEntry((String) entry2.getKey(), (String) entry2.getValue());
        }).collect(Collectors.toSet()));
    }

    private void shouldThrowWhenConfigDescriptionsDoNotContainConfigDuringValidation(InternalTopicConfig internalTopicConfig, Config config) {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(this.time, adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.complete(new TopicDescription("test_topic", false, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList()))));
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(config);
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl2)}));
        });
        Assertions.assertThrows(IllegalStateException.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenTimeoutIsExceededDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3), adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        kafkaFutureImpl.completeExceptionally(new TimeoutException());
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new Config((Collection) repartitionTopicConfig().entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toSet())));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl2)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    @Test
    public void shouldThrowTimeoutExceptionWhenFuturesNeverCompleteDuringValidation() {
        AdminClient adminClient = (AdminClient) Mockito.mock(AdminClient.class);
        InternalTopicManager internalTopicManager = new InternalTopicManager(new MockTime(((Integer) this.config.get(StreamsConfig.consumerPrefix("max.poll.interval.ms"))).intValue() / 3), adminClient, new StreamsConfig(this.config));
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        Mockito.when(adminClient.describeTopics(Collections.singleton("test_topic"))).thenAnswer(invocationOnMock -> {
            return new MockDescribeTopicsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("test_topic", kafkaFutureImpl)}));
        });
        KafkaFutureImpl kafkaFutureImpl2 = new KafkaFutureImpl();
        kafkaFutureImpl2.complete(new Config((Collection) repartitionTopicConfig().entrySet().stream().map(entry -> {
            return new ConfigEntry((String) entry.getKey(), (String) entry.getValue());
        }).collect(Collectors.toSet())));
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "test_topic");
        Mockito.when(adminClient.describeConfigs(Collections.singleton(configResource))).thenAnswer(invocationOnMock2 -> {
            return new MockDescribeConfigsResult(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(configResource, kafkaFutureImpl2)}));
        });
        InternalTopicConfig internalTopicConfig = setupRepartitionTopicConfig("test_topic", 1);
        Assertions.assertThrows(TimeoutException.class, () -> {
            internalTopicManager.validate(Collections.singletonMap("test_topic", internalTopicConfig));
        });
    }

    private NewTopic newTopic(String str, InternalTopicConfig internalTopicConfig, StreamsConfig streamsConfig) {
        return new NewTopic(str, internalTopicConfig.numberOfPartitions(), Optional.of(Short.valueOf(streamsConfig.getInt("replication.factor").shortValue()))).configs(internalTopicConfig.getProperties(Collections.emptyMap(), streamsConfig.getLong("windowstore.changelog.additional.retention.ms").longValue()));
    }

    private Map<String, String> repartitionTopicConfig() {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "delete"), Utils.mkEntry("retention.ms", "-1"), Utils.mkEntry("retention.bytes", (Object) null)});
    }

    private Map<String, String> unwindowedUnversionedChangelogConfig() {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "compact")});
    }

    private Map<String, String> windowedChangelogConfig(long j) {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "compact,delete"), Utils.mkEntry("retention.ms", String.valueOf(j)), Utils.mkEntry("retention.bytes", (Object) null)});
    }

    private Map<String, String> versionedChangelogConfig(long j) {
        return Utils.mkMap(new Map.Entry[]{Utils.mkEntry("cleanup.policy", "compact"), Utils.mkEntry("min.compaction.lag.ms", String.valueOf(j))});
    }

    private void setupTopicInMockAdminClient(String str, Map<String, String> map) {
        this.mockAdminClient.addTopic(false, str, Collections.singletonList(new TopicPartitionInfo(0, this.broker1, this.cluster, Collections.emptyList())), map);
    }

    private InternalTopicConfig setupUnwindowedUnversionedChangelogTopicConfig(String str, int i) {
        UnwindowedUnversionedChangelogTopicConfig unwindowedUnversionedChangelogTopicConfig = new UnwindowedUnversionedChangelogTopicConfig(str, Collections.emptyMap());
        unwindowedUnversionedChangelogTopicConfig.setNumberOfPartitions(i);
        return unwindowedUnversionedChangelogTopicConfig;
    }

    private InternalTopicConfig setupWindowedChangelogTopicConfig(String str, int i, long j) {
        WindowedChangelogTopicConfig windowedChangelogTopicConfig = new WindowedChangelogTopicConfig(str, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("retention.ms", String.valueOf(j))}), 10L);
        windowedChangelogTopicConfig.setNumberOfPartitions(i);
        return windowedChangelogTopicConfig;
    }

    private InternalTopicConfig setupVersionedChangelogTopicConfig(String str, int i, long j) {
        VersionedChangelogTopicConfig versionedChangelogTopicConfig = new VersionedChangelogTopicConfig(str, Utils.mkMap(new Map.Entry[]{Utils.mkEntry("min.compaction.lag.ms", String.valueOf(j))}), 12L);
        versionedChangelogTopicConfig.setNumberOfPartitions(i);
        return versionedChangelogTopicConfig;
    }

    private InternalTopicConfig setupRepartitionTopicConfig(String str, int i) {
        RepartitionTopicConfig repartitionTopicConfig = new RepartitionTopicConfig(str, Collections.emptyMap());
        repartitionTopicConfig.setNumberOfPartitions(i);
        return repartitionTopicConfig;
    }
}
