package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.errors.StreamsNotStartedException;
import org.apache.kafka.streams.errors.StreamsStoppedException;
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/IQv2IntegrationTest.class */
public class IQv2IntegrationTest {
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String STORE_NAME = "kv-store";
    private KafkaStreams kafkaStreams;
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
    private static int port = 0;
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

    @BeforeAll
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, NUM_BROKERS);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        LinkedList linkedList = new LinkedList();
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 3; i += NUM_BROKERS) {
            try {
                linkedList.add(kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i % 2), Long.valueOf(Time.SYSTEM.milliseconds()), Integer.valueOf(i), Integer.valueOf(i), (Iterable) null)));
                Time.SYSTEM.sleep(1L);
            } catch (Throwable th) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        kafkaProducer.flush();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            RecordMetadata recordMetadata = (RecordMetadata) ((Future) it.next()).get(1L, TimeUnit.MINUTES);
            MatcherAssert.assertThat(Boolean.valueOf(recordMetadata.hasOffset()), Matchers.is(true));
            INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
        }
        kafkaProducer.close();
        MatcherAssert.assertThat(INPUT_POSITION, Matchers.equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 1L).withComponent(INPUT_TOPIC_NAME, NUM_BROKERS, 0L)));
    }

    @BeforeEach
    public void beforeTest(TestInfo testInfo) {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(STORE_NAME));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration(IntegrationTestUtils.safeUniqueTestName(testInfo)));
        this.kafkaStreams.cleanUp();
    }

    @AfterEach
    public void afterTest() {
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
    }

    @AfterAll
    public static void after() {
        CLUSTER.stop();
    }

    @Test
    public void shouldFailUnknownStore() {
        StateQueryRequest withQuery = StateQueryRequest.inStore("unknown-store").withQuery(KeyQuery.withKey(Integer.valueOf(NUM_BROKERS)));
        Assertions.assertThrows(UnknownStateStoreException.class, () -> {
            this.kafkaStreams.query(withQuery);
        });
    }

    @Test
    public void shouldFailNotStarted() {
        StateQueryRequest withQuery = StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(Integer.valueOf(NUM_BROKERS)));
        Assertions.assertThrows(StreamsNotStartedException.class, () -> {
            this.kafkaStreams.query(withQuery);
        });
    }

    @Test
    public void shouldFailStopped() {
        StateQueryRequest withQuery = StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(Integer.valueOf(NUM_BROKERS)));
        this.kafkaStreams.start();
        this.kafkaStreams.close(Duration.ZERO);
        Assertions.assertThrows(StreamsStoppedException.class, () -> {
            this.kafkaStreams.query(withQuery);
        });
    }

    @Test
    public void shouldRejectNonRunningActive() throws NoSuchFieldException, IllegalAccessException {
        KeyQuery withKey = KeyQuery.withKey(Integer.valueOf(NUM_BROKERS));
        StateQueryRequest requireActive = StateQueryRequest.inStore(STORE_NAME).withQuery(withKey).requireActive();
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)});
        this.kafkaStreams.start();
        Field declaredField = KafkaStreams.class.getDeclaredField("threads");
        declaredField.setAccessible(true);
        StreamThread streamThread = (StreamThread) ((List) declaredField.get(this.kafkaStreams)).get(0);
        Field declaredField2 = StreamThread.class.getDeclaredField("stateLock");
        declaredField2.setAccessible(true);
        Object obj = declaredField2.get(streamThread);
        IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(withKey), mkSet);
        synchronized (obj) {
            Field declaredField3 = StreamThread.class.getDeclaredField("state");
            declaredField3.setAccessible(true);
            declaredField3.set(streamThread, StreamThread.State.PARTITIONS_ASSIGNED);
            StateQueryResult iqv2WaitForPartitions = IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, requireActive, mkSet);
            MatcherAssert.assertThat(iqv2WaitForPartitions.getPartitionResults().keySet(), Matchers.is(mkSet));
            for (Integer num : mkSet) {
                MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) iqv2WaitForPartitions.getPartitionResults().get(num)).isFailure()), Matchers.is(true));
                MatcherAssert.assertThat(((QueryResult) iqv2WaitForPartitions.getPartitionResults().get(num)).getFailureReason(), Matchers.is(FailureReason.NOT_ACTIVE));
                MatcherAssert.assertThat(((QueryResult) iqv2WaitForPartitions.getPartitionResults().get(num)).getFailureMessage(), Matchers.is("Query requires a running active task, but partition was in state PARTITIONS_ASSIGNED and was active."));
            }
        }
    }

    @Test
    public void shouldFetchFromPartition() {
        KeyQuery withKey = KeyQuery.withKey(Integer.valueOf(NUM_BROKERS));
        Set singleton = Collections.singleton(Integer.valueOf(NUM_BROKERS));
        StateQueryRequest withPartitions = StateQueryRequest.inStore(STORE_NAME).withQuery(withKey).withPartitions(singleton);
        this.kafkaStreams.start();
        MatcherAssert.assertThat(IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, withPartitions).getPartitionResults().keySet(), Matchers.equalTo(singleton));
    }

    @Test
    public void shouldFetchExplicitlyFromAllPartitions() {
        KeyQuery withKey = KeyQuery.withKey(Integer.valueOf(NUM_BROKERS));
        Set mkSet = Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)});
        StateQueryRequest withAllPartitions = StateQueryRequest.inStore(STORE_NAME).withQuery(withKey).withAllPartitions();
        this.kafkaStreams.start();
        MatcherAssert.assertThat(IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, withAllPartitions, mkSet).getPartitionResults().keySet(), Matchers.equalTo(mkSet));
    }

    @Test
    public void shouldNotRequireQueryHandler(TestInfo testInfo) {
        KeyQuery withKey = KeyQuery.withKey(Integer.valueOf(NUM_BROKERS));
        StateQueryRequest withPartitions = StateQueryRequest.inStore(STORE_NAME).withQuery(withKey).withPartitions(Collections.singleton(Integer.valueOf(NUM_BROKERS)));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), Materialized.as(new KeyValueBytesStoreSupplier() { // from class: org.apache.kafka.streams.integration.IQv2IntegrationTest.1
            public String name() {
                return IQv2IntegrationTest.STORE_NAME;
            }

            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public KeyValueStore<Bytes, byte[]> m26get() {
                return new KeyValueStore<Bytes, byte[]>() { // from class: org.apache.kafka.streams.integration.IQv2IntegrationTest.1.1
                    private boolean open = false;
                    private Map<Bytes, byte[]> map = new HashMap();
                    private Position position;
                    private StateStoreContext context;

                    public void put(Bytes bytes, byte[] bArr) {
                        synchronized (this.position) {
                            this.map.put(bytes, bArr);
                            StoreQueryUtils.updatePosition(this.position, this.context);
                        }
                    }

                    public byte[] putIfAbsent(Bytes bytes, byte[] bArr) {
                        byte[] putIfAbsent;
                        synchronized (this.position) {
                            StoreQueryUtils.updatePosition(this.position, this.context);
                            putIfAbsent = this.map.putIfAbsent(bytes, bArr);
                        }
                        return putIfAbsent;
                    }

                    public void putAll(List<KeyValue<Bytes, byte[]>> list) {
                        synchronized (this.position) {
                            StoreQueryUtils.updatePosition(this.position, this.context);
                            for (KeyValue<Bytes, byte[]> keyValue : list) {
                                this.map.put((Bytes) keyValue.key, (byte[]) keyValue.value);
                            }
                        }
                    }

                    public byte[] delete(Bytes bytes) {
                        byte[] remove;
                        synchronized (this.position) {
                            StoreQueryUtils.updatePosition(this.position, this.context);
                            remove = this.map.remove(bytes);
                        }
                        return remove;
                    }

                    public String name() {
                        return IQv2IntegrationTest.STORE_NAME;
                    }

                    @Deprecated
                    public void init(ProcessorContext processorContext, StateStore stateStore) {
                        throw new UnsupportedOperationException();
                    }

                    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
                        stateStoreContext.register(stateStore, (bArr, bArr2) -> {
                            put(Bytes.wrap(bArr), bArr2);
                        });
                        this.open = true;
                        this.position = Position.emptyPosition();
                        this.context = stateStoreContext;
                    }

                    public void flush() {
                    }

                    public void close() {
                        this.open = false;
                        this.map.clear();
                    }

                    public boolean persistent() {
                        return false;
                    }

                    public boolean isOpen() {
                        return this.open;
                    }

                    public Position getPosition() {
                        return this.position;
                    }

                    public byte[] get(Bytes bytes) {
                        return this.map.get(bytes);
                    }

                    public KeyValueIterator<Bytes, byte[]> range(Bytes bytes, Bytes bytes2) {
                        throw new UnsupportedOperationException();
                    }

                    public KeyValueIterator<Bytes, byte[]> all() {
                        throw new UnsupportedOperationException();
                    }

                    public long approximateNumEntries() {
                        return this.map.size();
                    }
                };
            }

            public String metricsScope() {
                return "nonquery";
            }
        }));
        this.kafkaStreams.close();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration(IntegrationTestUtils.safeUniqueTestName(testInfo)));
        this.kafkaStreams.cleanUp();
        this.kafkaStreams.start();
        QueryResult queryResult = (QueryResult) IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, withPartitions).getPartitionResults().get(Integer.valueOf(NUM_BROKERS));
        MatcherAssert.assertThat(Boolean.valueOf(queryResult.isFailure()), Matchers.is(true));
        MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
        MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.matchesPattern("This store (.*) doesn't know how to execute the given query (.*). Contact the store maintainer if you need support for a new query type."));
    }

    private Properties streamsConfiguration(String str) {
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        properties.put("application.id", "app-" + str);
        StringBuilder append = new StringBuilder().append("localhost:");
        int i = port + NUM_BROKERS;
        port = i;
        properties.put("application.server", append.append(i).toString());
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("num.standby.replicas", Integer.valueOf(NUM_BROKERS));
        properties.put("max.poll.records", 100);
        properties.put("heartbeat.interval.ms", 200);
        properties.put("session.timeout.ms", 1000);
        properties.put("commit.interval.ms", 100L);
        properties.put("num.stream.threads", Integer.valueOf(NUM_BROKERS));
        return properties;
    }
}
