package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/TaskMetadataIntegrationTest.class */
public class TaskMetadataIntegrationTest {
    public TestInfo testInfo;
    private String inputTopic;
    private static StreamsBuilder builder;
    private static Properties properties;
    private static String appId;
    private AtomicBoolean process;
    private AtomicBoolean commit;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, new Properties(), Collections.emptyList(), 0, 0);
    private static String appIdPrefix = "TaskMetadataTest_";

    /* loaded from: input_file:org/apache/kafka/streams/integration/TaskMetadataIntegrationTest$PauseProcessor.class */
    private class PauseProcessor extends ContextualProcessor<String, String, Void, Void> {
        private PauseProcessor() {
        }

        public void process(Record<String, String> record) {
            while (!TaskMetadataIntegrationTest.this.process.get()) {
                try {
                    wait(100L);
                } catch (InterruptedException e) {
                }
            }
            if (TaskMetadataIntegrationTest.this.commit.get()) {
                context().commit();
            }
            TaskMetadataIntegrationTest.this.process.set(false);
        }
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void setup(TestInfo testInfo) {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        appId = appIdPrefix + safeUniqueTestName;
        this.inputTopic = "input" + safeUniqueTestName;
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, this.inputTopic);
        builder = new StreamsBuilder();
        this.process = new AtomicBoolean(true);
        this.commit = new AtomicBoolean(true);
        builder.stream(this.inputTopic).process(() -> {
            return new PauseProcessor();
        }, new String[0]);
        properties = Utils.mkObjectProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("application.id", appId), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath()), Utils.mkEntry("num.stream.threads", 2), Utils.mkEntry("default.key.serde", Serdes.StringSerde.class), Utils.mkEntry("default.value.serde", Serdes.StringSerde.class), Utils.mkEntry("commit.interval.ms", 1L)}));
    }

    @Test
    public void shouldReportCorrectCommittedOffsetInformation() {
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
            try {
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
                MatcherAssert.assertThat(Integer.valueOf(taskMetadata.committedOffsets().size()), CoreMatchers.equalTo(1));
                TopicPartition topicPartition = new TopicPartition(this.inputTopic, 0);
                produceMessages(0L, this.inputTopic, "test");
                TestUtils.waitForCondition(() -> {
                    return !this.process.get();
                }, "The record was not processed");
                TestUtils.waitForCondition(() -> {
                    return ((Long) taskMetadata.committedOffsets().get(topicPartition)).longValue() == 1;
                }, "the record was processed");
                this.process.set(true);
                produceMessages(0L, this.inputTopic, "test1");
                TestUtils.waitForCondition(() -> {
                    return !this.process.get();
                }, "The record was not processed");
                TestUtils.waitForCondition(() -> {
                    return ((Long) taskMetadata.committedOffsets().get(topicPartition)).longValue() == 2;
                }, "the record was processed");
                this.process.set(true);
                produceMessages(0L, this.inputTopic, "test1");
                TestUtils.waitForCondition(() -> {
                    return !this.process.get();
                }, "The record was not processed");
                TestUtils.waitForCondition(() -> {
                    return ((Long) taskMetadata.committedOffsets().get(topicPartition)).longValue() == 3;
                }, "the record was processed");
                kafkaStreams.close();
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Test
    public void shouldReportCorrectEndOffsetInformation() {
        try {
            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties);
            try {
                IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
                TaskMetadata taskMetadata = getTaskMetadata(kafkaStreams);
                MatcherAssert.assertThat(Integer.valueOf(taskMetadata.endOffsets().size()), CoreMatchers.equalTo(1));
                TopicPartition topicPartition = new TopicPartition(this.inputTopic, 0);
                this.commit.set(false);
                for (int i = 0; i < 10; i++) {
                    produceMessages(0L, this.inputTopic, "test");
                    TestUtils.waitForCondition(() -> {
                        return !this.process.get();
                    }, "The record was not processed");
                    this.process.set(true);
                }
                MatcherAssert.assertThat((Long) taskMetadata.endOffsets().get(topicPartition), CoreMatchers.equalTo(9L));
                kafkaStreams.close();
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private TaskMetadata getTaskMetadata(KafkaStreams kafkaStreams) throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            atomicReference.set((List) kafkaStreams.metadataForLocalThreads().stream().flatMap(threadMetadata -> {
                return threadMetadata.activeTasks().stream();
            }).collect(Collectors.toList()));
            return ((List) atomicReference.get()).size() == 1;
        }, "The number of active tasks returned in the allotted time was not one.");
        return (TaskMetadata) ((List) atomicReference.get()).get(0);
    }

    @AfterEach
    public void teardown() throws IOException {
        IntegrationTestUtils.purgeLocalStreamsState(properties);
    }

    private void produceMessages(long j, String str, String str2) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(str, Collections.singletonList(new KeyValue("1", str2)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }
}
