package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.query.TimestampedKeyQuery;
import org.apache.kafka.streams.query.TimestampedRangeQuery;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.StoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.class */
public class IQv2StoreIntegrationTest {
    private static final String STORE_NAME = "kv-store";
    private KafkaStreams kafkaStreams;
    private static final Logger LOG = LoggerFactory.getLogger(IQv2StoreIntegrationTest.class);
    private static final long SEED = new Random().nextLong();
    private static final Random RANDOM = new Random(SEED);
    public static final Duration WINDOW_SIZE = Duration.ofMinutes(5);
    private static int port = 0;
    private static final Position INPUT_POSITION = Position.emptyPosition();
    private static final long RECORD_TIME = System.currentTimeMillis();
    private static final long WINDOW_START = (RECORD_TIME / WINDOW_SIZE.toMillis()) * WINDOW_SIZE.toMillis();
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final Position POSITION_0 = Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(INPUT_TOPIC_NAME, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 5L)}))}));

    /* loaded from: input_file:org/apache/kafka/streams/integration/IQv2StoreIntegrationTest$StoresToTest.class */
    public enum StoresToTest {
        GLOBAL_IN_MEMORY_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.1
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean global() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        GLOBAL_IN_MEMORY_LRU { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.2
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.lruMap(IQv2StoreIntegrationTest.STORE_NAME, 100);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean global() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        GLOBAL_ROCKS_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.3
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean timestamped() {
                return false;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean global() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        GLOBAL_TIME_ROCKS_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.4
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean global() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        IN_MEMORY_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.5
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        IN_MEMORY_LRU { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.6
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.lruMap(IQv2StoreIntegrationTest.STORE_NAME, 100);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        ROCKS_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.7
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean timestamped() {
                return false;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        TIME_ROCKS_KV { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.8
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedKeyValueStore(IQv2StoreIntegrationTest.STORE_NAME);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean keyValue() {
                return true;
            }
        },
        IN_MEMORY_WINDOW { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.9
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemoryWindowStore(IQv2StoreIntegrationTest.STORE_NAME, Duration.ofDays(1L), IQv2StoreIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }
        },
        ROCKS_WINDOW { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.10
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentWindowStore(IQv2StoreIntegrationTest.STORE_NAME, Duration.ofDays(1L), IQv2StoreIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean timestamped() {
                return false;
            }
        },
        TIME_ROCKS_WINDOW { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.11
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentTimestampedWindowStore(IQv2StoreIntegrationTest.STORE_NAME, Duration.ofDays(1L), IQv2StoreIntegrationTest.WINDOW_SIZE, false);
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean isWindowed() {
                return true;
            }
        },
        IN_MEMORY_SESSION { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.12
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.inMemorySessionStore(IQv2StoreIntegrationTest.STORE_NAME, Duration.ofDays(1L));
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean isSession() {
                return true;
            }
        },
        ROCKS_SESSION { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest.13
            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public StoreSupplier<?> supplier() {
                return Stores.persistentSessionStore(IQv2StoreIntegrationTest.STORE_NAME, Duration.ofDays(1L));
            }

            @Override // org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.StoresToTest
            public boolean isSession() {
                return true;
            }
        };

        public abstract StoreSupplier<?> supplier();

        public boolean timestamped() {
            return true;
        }

        public boolean global() {
            return false;
        }

        public boolean keyValue() {
            return false;
        }

        public boolean isWindowed() {
            return false;
        }

        public boolean isSession() {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/IQv2StoreIntegrationTest$UnknownQuery.class */
    public static class UnknownQuery implements Query<Void> {
    }

    public static Stream<Arguments> data() {
        LOG.info("Generating test cases according to random seed: {}", Long.valueOf(SEED));
        ArrayList arrayList = new ArrayList();
        Iterator it = Arrays.asList(true, false).iterator();
        while (it.hasNext()) {
            boolean booleanValue = ((Boolean) it.next()).booleanValue();
            Iterator it2 = Arrays.asList(true, false).iterator();
            while (it2.hasNext()) {
                boolean booleanValue2 = ((Boolean) it2.next()).booleanValue();
                StoresToTest[] values = StoresToTest.values();
                int length = values.length;
                for (int i = 0; i < length; i += NUM_BROKERS) {
                    StoresToTest storesToTest = values[i];
                    Iterator it3 = Arrays.asList("DSL", "PAPI").iterator();
                    while (it3.hasNext()) {
                        arrayList.add(Arguments.of(new Object[]{Boolean.valueOf(booleanValue), Boolean.valueOf(booleanValue2), storesToTest.name(), (String) it3.next()}));
                    }
                }
            }
        }
        Collections.shuffle(arrayList, RANDOM);
        return arrayList.stream();
    }

    @BeforeAll
    public static void before() throws InterruptedException, IOException, ExecutionException, TimeoutException {
        CLUSTER.start();
        CLUSTER.deleteAllTopicsAndWait(IntegrationTestUtils.DEFAULT_TIMEOUT);
        CLUSTER.createTopic(INPUT_TOPIC_NAME, 2, NUM_BROKERS);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        LinkedList linkedList = new LinkedList();
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        for (int i = 0; i < 10; i += NUM_BROKERS) {
            try {
                int i2 = i / 2;
                linkedList.add(kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, Integer.valueOf(i2 % 2), Long.valueOf(WINDOW_START + (Duration.ofMinutes(2L).toMillis() * i)), Integer.valueOf(i2), Integer.valueOf(i), (Iterable) null)));
                Time.SYSTEM.sleep(1L);
            } catch (Throwable th) {
                try {
                    kafkaProducer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        kafkaProducer.flush();
        Iterator it = linkedList.iterator();
        while (it.hasNext()) {
            RecordMetadata recordMetadata = (RecordMetadata) ((Future) it.next()).get(1L, TimeUnit.MINUTES);
            MatcherAssert.assertThat(Boolean.valueOf(recordMetadata.hasOffset()), Matchers.is(true));
            INPUT_POSITION.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
        }
        kafkaProducer.close();
        MatcherAssert.assertThat(INPUT_POSITION, Matchers.equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 5L).withComponent(INPUT_TOPIC_NAME, NUM_BROKERS, 3L)));
    }

    public void setup(boolean z, boolean z2, StoresToTest storesToTest, String str) {
        StoreSupplier<?> supplier = storesToTest.supplier();
        Properties streamsConfiguration = streamsConfiguration(z, z2, storesToTest.name(), str);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        if (Objects.equals(str, "DSL") && (supplier instanceof KeyValueBytesStoreSupplier)) {
            setUpKeyValueDSLTopology((KeyValueBytesStoreSupplier) supplier, streamsBuilder, z, z2, storesToTest);
        } else if (Objects.equals(str, "PAPI") && (supplier instanceof KeyValueBytesStoreSupplier)) {
            setUpKeyValuePAPITopology((KeyValueBytesStoreSupplier) supplier, streamsBuilder, z, z2, storesToTest);
        } else if (Objects.equals(str, "DSL") && (supplier instanceof WindowBytesStoreSupplier)) {
            setUpWindowDSLTopology((WindowBytesStoreSupplier) supplier, streamsBuilder, z, z2);
        } else if (Objects.equals(str, "PAPI") && (supplier instanceof WindowBytesStoreSupplier)) {
            setUpWindowPAPITopology((WindowBytesStoreSupplier) supplier, streamsBuilder, z, z2, storesToTest);
        } else if (Objects.equals(str, "DSL") && (supplier instanceof SessionBytesStoreSupplier)) {
            setUpSessionDSLTopology((SessionBytesStoreSupplier) supplier, streamsBuilder, z, z2);
        } else {
            if (!Objects.equals(str, "PAPI") || !(supplier instanceof SessionBytesStoreSupplier)) {
                throw new AssertionError("Store supplier is an unrecognized type.");
            }
            setUpSessionPAPITopology((SessionBytesStoreSupplier) supplier, streamsBuilder, z, z2, storesToTest);
        }
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(streamsConfiguration, streamsBuilder, true);
    }

    private void setUpSessionDSLTopology(SessionBytesStoreSupplier sessionBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        Materialized as = Materialized.as(sessionBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).groupByKey().windowedBy(SessionWindows.ofInactivityGapWithNoGrace(WINDOW_SIZE)).aggregate(() -> {
            return 0;
        }, (num, num2, num3) -> {
            return Integer.valueOf(num3.intValue() + num2.intValue());
        }, (num4, num5, num6) -> {
            return Integer.valueOf(num5.intValue() + num6.intValue());
        }, as);
    }

    private void setUpWindowDSLTopology(WindowBytesStoreSupplier windowBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2) {
        Materialized as = Materialized.as(windowBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE)).aggregate(() -> {
            return 0;
        }, (num, num2, num3) -> {
            return Integer.valueOf(num3.intValue() + num2.intValue());
        }, as);
    }

    private void setUpKeyValueDSLTopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        Materialized as = Materialized.as(keyValueBytesStoreSupplier);
        if (z) {
            as.withCachingEnabled();
        } else {
            as.withCachingDisabled();
        }
        if (z2) {
            as.withLoggingEnabled(Collections.emptyMap());
        } else {
            as.withLoggingDisabled();
        }
        if (storesToTest.global()) {
            streamsBuilder.globalTable(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), as);
        } else {
            streamsBuilder.table(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), as);
        }
    }

    private void setUpKeyValuePAPITopology(KeyValueBytesStoreSupplier keyValueBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        StoreBuilder keyValueStoreBuilder;
        ProcessorSupplier processorSupplier;
        if (storesToTest.timestamped()) {
            keyValueStoreBuilder = Stores.timestampedKeyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.1
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(keyValueStoreBuilder.name()).put((Integer) record.key(), ValueAndTimestamp.make((Integer) record.value(), record.timestamp()));
                    }
                };
            };
        } else {
            keyValueStoreBuilder = Stores.keyValueStoreBuilder(keyValueBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.2
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(keyValueStoreBuilder.name()).put((Integer) record.key(), (Integer) record.value());
                    }
                };
            };
        }
        if (z) {
            keyValueStoreBuilder.withCachingEnabled();
        } else {
            keyValueStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            keyValueStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            keyValueStoreBuilder.withLoggingDisabled();
        }
        if (storesToTest.global()) {
            streamsBuilder.addGlobalStore(keyValueStoreBuilder, INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), processorSupplier);
        } else {
            streamsBuilder.addStateStore(keyValueStoreBuilder);
            streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{keyValueStoreBuilder.name()});
        }
    }

    private void setUpWindowPAPITopology(WindowBytesStoreSupplier windowBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        StoreBuilder windowStoreBuilder;
        ProcessorSupplier processorSupplier;
        if (storesToTest.timestamped()) {
            windowStoreBuilder = Stores.timestampedWindowStoreBuilder(windowBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.3
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(windowStoreBuilder.name()).put((Integer) record.key(), ValueAndTimestamp.make((Integer) record.value(), record.timestamp()), (record.timestamp() / IQv2StoreIntegrationTest.WINDOW_SIZE.toMillis()) * IQv2StoreIntegrationTest.WINDOW_SIZE.toMillis());
                    }
                };
            };
        } else {
            windowStoreBuilder = Stores.windowStoreBuilder(windowBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
            processorSupplier = () -> {
                return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.4
                    public void process(Record<Integer, Integer> record) {
                        context().getStateStore(windowStoreBuilder.name()).put((Integer) record.key(), (Integer) record.value(), (record.timestamp() / IQv2StoreIntegrationTest.WINDOW_SIZE.toMillis()) * IQv2StoreIntegrationTest.WINDOW_SIZE.toMillis());
                    }
                };
            };
        }
        if (z) {
            windowStoreBuilder.withCachingEnabled();
        } else {
            windowStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            windowStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            windowStoreBuilder.withLoggingDisabled();
        }
        if (storesToTest.global()) {
            streamsBuilder.addGlobalStore(windowStoreBuilder, INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), processorSupplier);
        } else {
            streamsBuilder.addStateStore(windowStoreBuilder);
            streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{windowStoreBuilder.name()});
        }
    }

    private void setUpSessionPAPITopology(SessionBytesStoreSupplier sessionBytesStoreSupplier, StreamsBuilder streamsBuilder, boolean z, boolean z2, StoresToTest storesToTest) {
        StoreBuilder sessionStoreBuilder = Stores.sessionStoreBuilder(sessionBytesStoreSupplier, Serdes.Integer(), Serdes.Integer());
        ProcessorSupplier processorSupplier = () -> {
            return new ContextualProcessor<Integer, Integer, Void, Void>() { // from class: org.apache.kafka.streams.integration.IQv2StoreIntegrationTest.5
                public void process(Record<Integer, Integer> record) {
                    context().getStateStore(sessionStoreBuilder.name()).put(new Windowed((Integer) record.key(), new SessionWindow(record.timestamp(), record.timestamp())), (Integer) record.value());
                }
            };
        };
        if (z) {
            sessionStoreBuilder.withCachingEnabled();
        } else {
            sessionStoreBuilder.withCachingDisabled();
        }
        if (z2) {
            sessionStoreBuilder.withLoggingEnabled(Collections.emptyMap());
        } else {
            sessionStoreBuilder.withLoggingDisabled();
        }
        if (storesToTest.global()) {
            streamsBuilder.addGlobalStore(sessionStoreBuilder, INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer()), processorSupplier);
        } else {
            streamsBuilder.addStateStore(sessionStoreBuilder);
            streamsBuilder.stream(INPUT_TOPIC_NAME, Consumed.with(Serdes.Integer(), Serdes.Integer())).process(processorSupplier, new String[]{sessionStoreBuilder.name()});
        }
    }

    @AfterEach
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterAll
    public static void after() {
        CLUSTER.stop();
    }

    @MethodSource({"data"})
    @ParameterizedTest
    public void verifyStore(boolean z, boolean z2, StoresToTest storesToTest, String str) {
        setup(z, z2, storesToTest, str);
        try {
            if (storesToTest.global()) {
                globalShouldRejectAllQueries();
            } else {
                shouldRejectUnknownQuery();
                shouldCollectExecutionInfo();
                shouldCollectExecutionInfoUnderFailure();
                if (storesToTest.keyValue()) {
                    if (storesToTest.timestamped()) {
                        shouldHandleKeyQuery(2, 5);
                        shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.makeAllowNullable(5, WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5)));
                        shouldHandleRangeQueries();
                        shouldHandleTimestampedRangeQueries(true);
                    } else {
                        shouldHandleKeyQuery(2, 5);
                        shouldHandleRangeQueries();
                        if (str.equals("DSL")) {
                            shouldHandleTimestampedRangeQueries(false);
                            shouldHandleRangeQueries();
                            if (z) {
                                shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.make(5, WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5)));
                            } else {
                                shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.make(5, -1L));
                            }
                        } else {
                            Assertions.assertThrows(AssertionError.class, () -> {
                                shouldHandleTimestampedKeyQuery(2, ValueAndTimestamp.make(5, WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5)));
                            });
                            Assertions.assertThrows(AssertionError.class, () -> {
                                shouldHandleTimestampedRangeQueries(false);
                            });
                        }
                    }
                }
                if (storesToTest.isWindowed()) {
                    if (storesToTest.timestamped()) {
                        Function function = (v0) -> {
                            return v0.value();
                        };
                        if (str.equals("DSL")) {
                            shouldHandleWindowKeyDSLQueries(function);
                            shouldHandleWindowRangeDSLQueries(function);
                        } else {
                            shouldHandleWindowKeyPAPIQueries(function);
                            shouldHandleWindowRangePAPIQueries(function);
                        }
                    } else {
                        Function identity = Function.identity();
                        if (str.equals("DSL")) {
                            shouldHandleWindowKeyDSLQueries(identity);
                            shouldHandleWindowRangeDSLQueries(identity);
                        } else {
                            shouldHandleWindowKeyPAPIQueries(identity);
                            shouldHandleWindowRangePAPIQueries(identity);
                        }
                    }
                }
                if (storesToTest.isSession()) {
                    if (str.equals("DSL")) {
                        shouldHandleSessionKeyDSLQueries();
                    } else {
                        shouldHandleSessionKeyPAPIQueries();
                    }
                }
            }
        } catch (AssertionError e) {
            LOG.error("Failed assertion", e);
            throw e;
        }
    }

    private <T> void shouldHandleRangeQueries() {
        shouldHandleRangeQuery(Optional.of(0), Optional.of(4), true, Arrays.asList(Integer.valueOf(NUM_BROKERS), 5, 9, 3, 7));
        shouldHandleRangeQuery(Optional.of(Integer.valueOf(NUM_BROKERS)), Optional.of(3), true, Arrays.asList(5, 3, 7));
        shouldHandleRangeQuery(Optional.of(3), Optional.empty(), true, Arrays.asList(9, 7));
        shouldHandleRangeQuery(Optional.empty(), Optional.of(3), true, Arrays.asList(Integer.valueOf(NUM_BROKERS), 5, 3, 7));
        shouldHandleRangeQuery(Optional.empty(), Optional.empty(), true, Arrays.asList(Integer.valueOf(NUM_BROKERS), 5, 9, 3, 7));
        shouldHandleRangeQuery(Optional.of(0), Optional.of(4), false, Arrays.asList(9, 5, Integer.valueOf(NUM_BROKERS), 7, 3));
        shouldHandleRangeQuery(Optional.of(Integer.valueOf(NUM_BROKERS)), Optional.of(3), false, Arrays.asList(5, 7, 3));
        shouldHandleRangeQuery(Optional.of(3), Optional.empty(), false, Arrays.asList(9, 7));
        shouldHandleRangeQuery(Optional.empty(), Optional.of(3), false, Arrays.asList(5, Integer.valueOf(NUM_BROKERS), 7, 3));
        shouldHandleRangeQuery(Optional.empty(), Optional.empty(), false, Arrays.asList(9, 5, Integer.valueOf(NUM_BROKERS), 7, 3));
    }

    private <T> void shouldHandleTimestampedRangeQueries(boolean z) {
        Optional<Integer> of = Optional.of(0);
        Optional<Integer> of2 = Optional.of(4);
        ValueAndTimestamp[] valueAndTimestampArr = new ValueAndTimestamp[5];
        valueAndTimestampArr[0] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr[NUM_BROKERS] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr[2] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr[3] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        valueAndTimestampArr[4] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(of, of2, true, Arrays.asList(valueAndTimestampArr));
        Optional<Integer> of3 = Optional.of(Integer.valueOf(NUM_BROKERS));
        Optional<Integer> of4 = Optional.of(3);
        ValueAndTimestamp[] valueAndTimestampArr2 = new ValueAndTimestamp[3];
        valueAndTimestampArr2[0] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr2[NUM_BROKERS] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        valueAndTimestampArr2[2] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(of3, of4, true, Arrays.asList(valueAndTimestampArr2));
        Optional<Integer> of5 = Optional.of(3);
        Optional<Integer> empty = Optional.empty();
        ValueAndTimestamp[] valueAndTimestampArr3 = new ValueAndTimestamp[2];
        valueAndTimestampArr3[0] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr3[NUM_BROKERS] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(of5, empty, true, Arrays.asList(valueAndTimestampArr3));
        Optional<Integer> empty2 = Optional.empty();
        Optional<Integer> of6 = Optional.of(3);
        ValueAndTimestamp[] valueAndTimestampArr4 = new ValueAndTimestamp[4];
        valueAndTimestampArr4[0] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr4[NUM_BROKERS] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr4[2] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        valueAndTimestampArr4[3] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(empty2, of6, true, Arrays.asList(valueAndTimestampArr4));
        Optional<Integer> empty3 = Optional.empty();
        Optional<Integer> empty4 = Optional.empty();
        ValueAndTimestamp[] valueAndTimestampArr5 = new ValueAndTimestamp[5];
        valueAndTimestampArr5[0] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr5[NUM_BROKERS] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr5[2] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr5[3] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        valueAndTimestampArr5[4] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(empty3, empty4, true, Arrays.asList(valueAndTimestampArr5));
        Optional<Integer> of7 = Optional.of(0);
        Optional<Integer> of8 = Optional.of(4);
        ValueAndTimestamp[] valueAndTimestampArr6 = new ValueAndTimestamp[5];
        valueAndTimestampArr6[0] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr6[NUM_BROKERS] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr6[2] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr6[3] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        valueAndTimestampArr6[4] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        shouldHandleTimestampedRangeQuery(of7, of8, false, Arrays.asList(valueAndTimestampArr6));
        Optional<Integer> of9 = Optional.of(Integer.valueOf(NUM_BROKERS));
        Optional<Integer> of10 = Optional.of(3);
        ValueAndTimestamp[] valueAndTimestampArr7 = new ValueAndTimestamp[3];
        valueAndTimestampArr7[0] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr7[NUM_BROKERS] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        valueAndTimestampArr7[2] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        shouldHandleTimestampedRangeQuery(of9, of10, false, Arrays.asList(valueAndTimestampArr7));
        Optional<Integer> of11 = Optional.of(3);
        Optional<Integer> empty5 = Optional.empty();
        ValueAndTimestamp[] valueAndTimestampArr8 = new ValueAndTimestamp[2];
        valueAndTimestampArr8[0] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr8[NUM_BROKERS] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        shouldHandleTimestampedRangeQuery(of11, empty5, false, Arrays.asList(valueAndTimestampArr8));
        Optional<Integer> empty6 = Optional.empty();
        Optional<Integer> of12 = Optional.of(3);
        ValueAndTimestamp[] valueAndTimestampArr9 = new ValueAndTimestamp[4];
        valueAndTimestampArr9[0] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr9[NUM_BROKERS] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr9[2] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        valueAndTimestampArr9[3] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        shouldHandleTimestampedRangeQuery(empty6, of12, false, Arrays.asList(valueAndTimestampArr9));
        Optional<Integer> empty7 = Optional.empty();
        Optional<Integer> empty8 = Optional.empty();
        ValueAndTimestamp[] valueAndTimestampArr10 = new ValueAndTimestamp[5];
        valueAndTimestampArr10[0] = ValueAndTimestamp.make(9, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 9) : -1L);
        valueAndTimestampArr10[NUM_BROKERS] = ValueAndTimestamp.make(5, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 5) : -1L);
        valueAndTimestampArr10[2] = ValueAndTimestamp.make(Integer.valueOf(NUM_BROKERS), z ? WINDOW_START + Duration.ofMinutes(2L).toMillis() : -1L);
        valueAndTimestampArr10[3] = ValueAndTimestamp.make(7, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 7) : -1L);
        valueAndTimestampArr10[4] = ValueAndTimestamp.make(3, z ? WINDOW_START + (Duration.ofMinutes(2L).toMillis() * 3) : -1L);
        shouldHandleTimestampedRangeQuery(empty7, empty8, false, Arrays.asList(valueAndTimestampArr10));
    }

    private <T> void shouldHandleWindowKeyDSLQueries(Function<T, Integer> function) {
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS)}));
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS)}));
        shouldHandleWindowKeyQuery(Integer.valueOf(NUM_BROKERS), Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[]{2}));
        shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{4, 5}));
        shouldHandleWindowKeyQuery(3, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{13}));
        shouldHandleWindowKeyQuery(4, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20L).toMillis()), function, Utils.mkSet(new Integer[]{17}));
        shouldHandleWindowKeyQuery(4, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24L).toMillis()), function, Utils.mkSet(new Integer[0]));
    }

    private <T> void shouldHandleWindowKeyPAPIQueries(Function<T, Integer> function) {
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS)}));
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(999, Instant.ofEpochMilli(WINDOW_START - 1), Instant.ofEpochMilli(WINDOW_START - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowKeyQuery(0, Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS)}));
        shouldHandleWindowKeyQuery(Integer.valueOf(NUM_BROKERS), Instant.ofEpochMilli(WINDOW_START), Instant.ofEpochMilli(WINDOW_START), function, Utils.mkSet(new Integer[]{2}));
        shouldHandleWindowKeyQuery(2, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{4, 5}));
        shouldHandleWindowKeyQuery(3, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{7}));
        shouldHandleWindowKeyQuery(4, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(15L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20L).toMillis()), function, Utils.mkSet(new Integer[]{9}));
        shouldHandleWindowKeyQuery(4, Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(20L).toMillis()), Instant.ofEpochMilli(WINDOW_START + Duration.ofMinutes(24L).toMillis()), function, Utils.mkSet(new Integer[0]));
    }

    private <T> void shouldHandleWindowRangeDSLQueries(Function<T, Integer> function) {
        long millis = WINDOW_SIZE.toMillis();
        long j = (RECORD_TIME / millis) * millis;
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j - 1), Instant.ofEpochMilli(j - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 2}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 2, 3, 4}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{3, 4, 5, 13}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(10L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{17, 5, 13}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(20L).toMillis()), function, Utils.mkSet(new Integer[]{17}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(20L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(25L).toMillis()), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{17, 3, 4, 5, 13}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 17, 2, 3, 4, 5, 13}));
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withKey(2)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(((Integer) it.next()).intValue()));
            if (!queryResult.isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.matchesPattern("This store \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\) doesn't know how to execute the given query \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\) because WindowStores only supports WindowRangeQuery.withWindowStartRange\\. Contact the store maintainer if you need support for a new query type\\."));
        }
    }

    private <T> void shouldHandleWindowRangePAPIQueries(Function<T, Integer> function) {
        long millis = WINDOW_SIZE.toMillis();
        long j = (RECORD_TIME / millis) * millis;
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j - 1), Instant.ofEpochMilli(j - 1), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 2}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 2, 3, 4}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(10L).toMillis()), function, Utils.mkSet(new Integer[]{3, 4, 5, 7}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(10L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{5, 7, 9}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(20L).toMillis()), function, Utils.mkSet(new Integer[]{9}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(20L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(25L).toMillis()), function, Utils.mkSet(new Integer[0]));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j + Duration.ofMinutes(5L).toMillis()), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{3, 4, 5, 7, 9}));
        shouldHandleWindowRangeQuery(Instant.ofEpochMilli(j), Instant.ofEpochMilli(j + Duration.ofMinutes(15L).toMillis()), function, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS), 2, 3, 4, 5, 7, 9}));
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withKey(2)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(((Integer) it.next()).intValue()));
            if (!queryResult.isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.matchesPattern("This store \\(class org.apache.kafka.streams.state.internals.Metered.*WindowStore\\) doesn't know how to execute the given query \\(WindowRangeQuery\\{key=Optional\\[2], timeFrom=Optional.empty, timeTo=Optional.empty}\\) because WindowStores only supports WindowRangeQuery.withWindowStartRange\\. Contact the store maintainer if you need support for a new query type\\."));
        }
    }

    private <T> void shouldHandleSessionKeyDSLQueries() {
        shouldHandleSessionRangeQuery(0, Utils.mkSet(new Integer[]{Integer.valueOf(NUM_BROKERS)}));
        shouldHandleSessionRangeQuery(Integer.valueOf(NUM_BROKERS), Utils.mkSet(new Integer[]{5}));
        shouldHandleSessionRangeQuery(2, Utils.mkSet(new Integer[]{9}));
        shouldHandleSessionRangeQuery(3, Utils.mkSet(new Integer[]{13}));
        shouldHandleSessionRangeQuery(4, Utils.mkSet(new Integer[]{17}));
        shouldHandleSessionRangeQuery(999, Utils.mkSet(new Integer[0]));
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withWindowStartRange(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(0L))).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(((Integer) it.next()).intValue()));
            if (!queryResult.isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.is("This store (class org.apache.kafka.streams.state.internals.MeteredSessionStore) doesn't know how to execute the given query (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]}) because SessionStores only support WindowRangeQuery.withKey. Contact the store maintainer if you need support for a new query type."));
        }
    }

    private <T> void shouldHandleSessionKeyPAPIQueries() {
        shouldHandleSessionRangeQuery(0, Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)}));
        shouldHandleSessionRangeQuery(Integer.valueOf(NUM_BROKERS), Utils.mkSet(new Integer[]{2, 3}));
        shouldHandleSessionRangeQuery(2, Utils.mkSet(new Integer[]{4, 5}));
        shouldHandleSessionRangeQuery(3, Utils.mkSet(new Integer[]{6, 7}));
        shouldHandleSessionRangeQuery(4, Utils.mkSet(new Integer[]{8, 9}));
        shouldHandleSessionRangeQuery(999, Utils.mkSet(new Integer[0]));
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withWindowStartRange(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(0L))).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(((Integer) it.next()).intValue()));
            if (!queryResult.isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.is("This store (class org.apache.kafka.streams.state.internals.MeteredSessionStore) doesn't know how to execute the given query (WindowRangeQuery{key=Optional.empty, timeFrom=Optional[1970-01-01T00:00:00Z], timeTo=Optional[1970-01-01T00:00:00Z]}) because SessionStores only support WindowRangeQuery.withKey. Contact the store maintainer if you need support for a new query type."));
        }
    }

    private void globalShouldRejectAllQueries() {
        StateQueryResult query = this.kafkaStreams.query(StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(Integer.valueOf(NUM_BROKERS))));
        MatcherAssert.assertThat(Boolean.valueOf(query.getGlobalResult().isFailure()), Matchers.is(true));
        MatcherAssert.assertThat(query.getGlobalResult().getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
        MatcherAssert.assertThat(query.getGlobalResult().getFailureMessage(), Matchers.is("Global stores do not yet support the KafkaStreams#query API. Use KafkaStreams#store instead."));
    }

    public void shouldRejectUnknownQuery() {
        StateQueryRequest withQuery = StateQueryRequest.inStore(STORE_NAME).withQuery(new UnknownQuery());
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)});
        makeAssertions(mkSet, IntegrationTestUtils.iqv2WaitForPartitions(this.kafkaStreams, withQuery, mkSet), queryResult -> {
            MatcherAssert.assertThat(Boolean.valueOf(queryResult.isFailure()), Matchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(queryResult.isSuccess()), Matchers.is(false));
            MatcherAssert.assertThat(queryResult.getFailureReason(), Matchers.is(FailureReason.UNKNOWN_QUERY_TYPE));
            MatcherAssert.assertThat(queryResult.getFailureMessage(), Matchers.matchesPattern("This store (.*) doesn't know how to execute the given query (.*). Contact the store maintainer if you need support for a new query type."));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getResult);
            MatcherAssert.assertThat(queryResult.getExecutionInfo(), Matchers.is(Matchers.empty()));
        });
    }

    public <V> void shouldHandleKeyQuery(Integer num, Integer num2) {
        QueryResult onlyPartitionResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(KeyQuery.withKey(num)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION))).getOnlyPartitionResult();
        if (onlyPartitionResult.isFailure()) {
            throw new AssertionError(onlyPartitionResult.toString());
        }
        MatcherAssert.assertThat(Boolean.valueOf(onlyPartitionResult.isSuccess()), Matchers.is(true));
        Objects.requireNonNull(onlyPartitionResult);
        Assertions.assertThrows(IllegalArgumentException.class, onlyPartitionResult::getFailureReason);
        Objects.requireNonNull(onlyPartitionResult);
        Assertions.assertThrows(IllegalArgumentException.class, onlyPartitionResult::getFailureMessage);
        MatcherAssert.assertThat((Integer) onlyPartitionResult.getResult(), Matchers.is(num2));
        MatcherAssert.assertThat(onlyPartitionResult.getExecutionInfo(), Matchers.is(Matchers.empty()));
        MatcherAssert.assertThat(onlyPartitionResult.getPosition(), Matchers.is(POSITION_0));
    }

    public <V> void shouldHandleTimestampedKeyQuery(Integer num, ValueAndTimestamp valueAndTimestamp) {
        QueryResult onlyPartitionResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(TimestampedKeyQuery.withKey(num)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION))).getOnlyPartitionResult();
        if (onlyPartitionResult == null) {
            throw new AssertionError("cannot use this query type to query result");
        }
        if (onlyPartitionResult.isFailure()) {
            throw new AssertionError(onlyPartitionResult.toString());
        }
        MatcherAssert.assertThat(Boolean.valueOf(onlyPartitionResult.isSuccess()), Matchers.is(true));
        Objects.requireNonNull(onlyPartitionResult);
        Assertions.assertThrows(IllegalArgumentException.class, onlyPartitionResult::getFailureReason);
        Objects.requireNonNull(onlyPartitionResult);
        Assertions.assertThrows(IllegalArgumentException.class, onlyPartitionResult::getFailureMessage);
        MatcherAssert.assertThat((ValueAndTimestamp) onlyPartitionResult.getResult(), Matchers.is(valueAndTimestamp));
        MatcherAssert.assertThat(onlyPartitionResult.getExecutionInfo(), Matchers.is(Matchers.empty()));
        MatcherAssert.assertThat(onlyPartitionResult.getPosition(), Matchers.is(POSITION_0));
    }

    public <V> void shouldHandleRangeQuery(Optional<Integer> optional, Optional<Integer> optional2, boolean z, List<Integer> list) {
        RangeQuery withRange = RangeQuery.withRange(optional.orElse(null), optional2.orElse(null));
        if (!z) {
            withRange = withRange.withDescendingKeys();
        }
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(withRange).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        ArrayList arrayList = new ArrayList();
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = new TreeSet(partitionResults.keySet()).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isSuccess()), Matchers.is(true));
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
            QueryResult queryResult2 = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult2);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult2::getFailureMessage);
            KeyValueIterator keyValueIterator = (KeyValueIterator) ((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getResult();
            while (keyValueIterator.hasNext()) {
                try {
                    arrayList.add((Integer) ((KeyValue) keyValueIterator.next()).value);
                } catch (Throwable th) {
                    if (keyValueIterator != null) {
                        try {
                            keyValueIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (keyValueIterator != null) {
                keyValueIterator.close();
            }
            MatcherAssert.assertThat(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getExecutionInfo(), Matchers.is(Matchers.empty()));
        }
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, arrayList, Matchers.is(list));
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, iqv2WaitForResult.getPosition(), Matchers.is(INPUT_POSITION));
    }

    public <V> void shouldHandleTimestampedRangeQuery(Optional<Integer> optional, Optional<Integer> optional2, boolean z, List<ValueAndTimestamp> list) {
        TimestampedRangeQuery withRange = TimestampedRangeQuery.withRange(optional.orElse(null), optional2.orElse(null));
        if (!z) {
            withRange = withRange.withDescendingKeys();
        }
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(withRange).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        ArrayList arrayList = new ArrayList();
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = new TreeSet(partitionResults.keySet()).iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isSuccess()), Matchers.is(true));
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
            QueryResult queryResult2 = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult2);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult2::getFailureMessage);
            KeyValueIterator keyValueIterator = (KeyValueIterator) ((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getResult();
            while (keyValueIterator.hasNext()) {
                try {
                    arrayList.add((ValueAndTimestamp) ((KeyValue) keyValueIterator.next()).value);
                } catch (Throwable th) {
                    if (keyValueIterator != null) {
                        try {
                            keyValueIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (keyValueIterator != null) {
                keyValueIterator.close();
            }
            MatcherAssert.assertThat(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getExecutionInfo(), Matchers.is(Matchers.empty()));
        }
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, arrayList, Matchers.is(list));
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, iqv2WaitForResult.getPosition(), Matchers.is(INPUT_POSITION));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <V> void shouldHandleWindowKeyQuery(Integer num, Instant instant, Instant instant2, Function<V, Integer> function, Set<Integer> set) {
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowKeyQuery.withKeyAndWindowStartRange(num, instant, instant2)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        HashSet hashSet = new HashSet();
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isSuccess()), Matchers.is(true));
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
            QueryResult queryResult2 = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult2);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult2::getFailureMessage);
            WindowStoreIterator windowStoreIterator = (WindowStoreIterator) ((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getResult();
            while (windowStoreIterator.hasNext()) {
                try {
                    hashSet.add((Integer) function.apply(((KeyValue) windowStoreIterator.next()).value));
                } catch (Throwable th) {
                    if (windowStoreIterator != null) {
                        try {
                            windowStoreIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (windowStoreIterator != null) {
                windowStoreIterator.close();
            }
            MatcherAssert.assertThat(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getExecutionInfo(), Matchers.is(Matchers.empty()));
        }
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, hashSet, Matchers.is(set));
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, iqv2WaitForResult.getPosition(), Matchers.is(INPUT_POSITION));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <V> void shouldHandleWindowRangeQuery(Instant instant, Instant instant2, Function<V, Integer> function, Set<Integer> set) {
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withWindowStartRange(instant, instant2)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        HashSet hashSet = new HashSet();
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isSuccess()), Matchers.is(true));
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
            QueryResult queryResult2 = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult2);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult2::getFailureMessage);
            KeyValueIterator keyValueIterator = (KeyValueIterator) ((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getResult();
            while (keyValueIterator.hasNext()) {
                try {
                    hashSet.add((Integer) function.apply(((KeyValue) keyValueIterator.next()).value));
                } catch (Throwable th) {
                    if (keyValueIterator != null) {
                        try {
                            keyValueIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (keyValueIterator != null) {
                keyValueIterator.close();
            }
            MatcherAssert.assertThat(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getExecutionInfo(), Matchers.is(Matchers.empty()));
        }
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, hashSet, Matchers.is(set));
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, iqv2WaitForResult.getPosition(), Matchers.is(INPUT_POSITION));
    }

    public <V> void shouldHandleSessionRangeQuery(Integer num, Set<Integer> set) {
        StateQueryResult iqv2WaitForResult = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(WindowRangeQuery.withKey(num)).withPartitions(Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)})).withPositionBound(PositionBound.at(INPUT_POSITION)));
        if (iqv2WaitForResult.getGlobalResult() != null) {
            Assertions.fail("global tables aren't implemented");
            return;
        }
        HashSet hashSet = new HashSet();
        Map partitionResults = iqv2WaitForResult.getPartitionResults();
        Iterator it = partitionResults.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            if (((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isFailure()) {
                throw new AssertionError(partitionResults.toString());
            }
            MatcherAssert.assertThat(Boolean.valueOf(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).isSuccess()), Matchers.is(true));
            QueryResult queryResult = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
            QueryResult queryResult2 = (QueryResult) partitionResults.get(Integer.valueOf(intValue));
            Objects.requireNonNull(queryResult2);
            Assertions.assertThrows(IllegalArgumentException.class, queryResult2::getFailureMessage);
            KeyValueIterator keyValueIterator = (KeyValueIterator) ((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getResult();
            while (keyValueIterator.hasNext()) {
                try {
                    hashSet.add((Integer) ((KeyValue) keyValueIterator.next()).value);
                } catch (Throwable th) {
                    if (keyValueIterator != null) {
                        try {
                            keyValueIterator.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }
            if (keyValueIterator != null) {
                keyValueIterator.close();
            }
            MatcherAssert.assertThat(((QueryResult) partitionResults.get(Integer.valueOf(intValue))).getExecutionInfo(), Matchers.is(Matchers.empty()));
        }
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, hashSet, Matchers.is(set));
        MatcherAssert.assertThat("Result:" + iqv2WaitForResult, iqv2WaitForResult.getPosition(), Matchers.is(INPUT_POSITION));
    }

    public void shouldCollectExecutionInfo() {
        KeyQuery withKey = KeyQuery.withKey(Integer.valueOf(NUM_BROKERS));
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)});
        makeAssertions(mkSet, IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(withKey).enableExecutionInfo().withPartitions(mkSet).withPositionBound(PositionBound.at(INPUT_POSITION))), queryResult -> {
            MatcherAssert.assertThat(queryResult.getExecutionInfo(), Matchers.not(Matchers.empty()));
        });
    }

    public void shouldCollectExecutionInfoUnderFailure() {
        UnknownQuery unknownQuery = new UnknownQuery();
        Set<Integer> mkSet = Utils.mkSet(new Integer[]{0, Integer.valueOf(NUM_BROKERS)});
        makeAssertions(mkSet, IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(unknownQuery).enableExecutionInfo().withPartitions(mkSet).withPositionBound(PositionBound.at(INPUT_POSITION))), queryResult -> {
            MatcherAssert.assertThat(queryResult.getExecutionInfo(), Matchers.not(Matchers.empty()));
        });
    }

    private <R> void makeAssertions(Set<Integer> set, StateQueryResult<R> stateQueryResult, Consumer<QueryResult<R>> consumer) {
        if (stateQueryResult.getGlobalResult() != null) {
            consumer.accept(stateQueryResult.getGlobalResult());
            return;
        }
        MatcherAssert.assertThat(stateQueryResult.getPartitionResults().keySet(), Matchers.is(set));
        Iterator<Integer> it = set.iterator();
        while (it.hasNext()) {
            consumer.accept((QueryResult) stateQueryResult.getPartitionResults().get(it.next()));
        }
    }

    private static Properties streamsConfiguration(boolean z, boolean z2, String str, String str2) {
        String str3 = IQv2StoreIntegrationTest.class.getName() + "-" + z + "-" + z2 + "-" + str + "-" + str2 + "-" + RANDOM.nextInt();
        Properties properties = new Properties();
        properties.put("topology.optimization", "all");
        properties.put("application.id", "app-" + str3);
        StringBuilder append = new StringBuilder().append("localhost:");
        int i = port + NUM_BROKERS;
        port = i;
        properties.put("application.server", append.append(i).toString());
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("num.standby.replicas", Integer.valueOf(NUM_BROKERS));
        properties.put("max.poll.records", 100);
        properties.put("heartbeat.interval.ms", 200);
        properties.put("session.timeout.ms", 1000);
        properties.put("commit.interval.ms", 100L);
        properties.put("num.stream.threads", Integer.valueOf(NUM_BROKERS));
        return properties;
    }
}
