package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.class */
public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000;
    private static final String STORE_NAME = "session-store";
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", "latest", this.time);
    private final String threadId = Thread.currentThread().getName();
    private final Initializer<Long> initializer = () -> {
        return 0L;
    };
    private final Aggregator<String, String, Long> aggregator = (str, str2, l) -> {
        return Long.valueOf(l.longValue() + 1);
    };
    private final Merger<String, Long> sessionMerger = (str, l, l2) -> {
        return Long.valueOf(l.longValue() + l2.longValue());
    };
    private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList();
    private InternalMockProcessorContext<Windowed<String>, Change<Long>> context;
    private KStreamSessionWindowAggregate<String, String, Long> sessionAggregator;
    private Processor<String, String, Windowed<String>, Change<Long>> processor;
    private SessionStore<String, Long> sessionStore;
    public EmitStrategy.StrategyType type;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    private void setup(EmitStrategy.StrategyType strategyType, boolean z) {
        this.type = strategyType;
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__emit.interval.ms.kstreams.windowed.aggregation__", 0);
        this.context = new InternalMockProcessorContext<Windowed<String>, Change<Long>>(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), this.streamsMetrics, new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000L, this.streamsMetrics), this.time) { // from class: org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest.1
            @Override // org.apache.kafka.test.InternalMockProcessorContext
            public <K extends Windowed<String>, V extends Change<Long>> void forward(Record<K, V> record) {
                KStreamSessionWindowAggregateProcessorTest.this.results.add(new KeyValueTimestamp((Windowed) record.key(), (Change) record.value(), record.timestamp()));
            }
        };
        this.emitFinal = this.type.equals(EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.emitStrategy = EmitStrategy.StrategyType.forType(this.type);
        this.sessionAggregator = new KStreamSessionWindowAggregate<>(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMillis(GAP_MS)), STORE_NAME, this.emitStrategy, this.initializer, this.aggregator, this.sessionMerger);
        if (this.processor != null) {
            this.processor.close();
        }
        this.processor = this.sessionAggregator.get();
        this.context.setTime(0L);
        TaskMetrics.droppedRecordsSensor(this.threadId, this.context.taskId().toString(), this.streamsMetrics);
        initStore(z);
        this.processor.init(this.context);
    }

    private void initStore(boolean z) {
        StoreBuilder withLoggingDisabled = Stores.sessionStoreBuilder(this.emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, 900000L, true) : Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(900000L)), Serdes.String(), Serdes.Long()).withLoggingDisabled();
        if (z && this.emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
            withLoggingDisabled.withCachingEnabled();
        }
        if (this.sessionStore != null) {
            this.sessionStore.close();
        }
        this.sessionStore = withLoggingDisabled.build();
        this.sessionStore.init(this.context, this.sessionStore);
    }

    @AfterEach
    public void closeStore() {
        this.sessionStore.close();
        this.processor.close();
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldCreateSingleSessionWhenWithinGap(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("john", "first", 0L));
        this.processor.process(new Record("john", "second", 500L));
        KeyValueIterator findSessions = this.sessionStore.findSessions("john", 0L, 2000L);
        try {
            Assertions.assertTrue(findSessions.hasNext());
            Assertions.assertEquals(2L, (Long) ((KeyValue) findSessions.next()).value);
            if (findSessions != null) {
                findSessions.close();
            }
        } catch (Throwable th) {
            if (findSessions != null) {
                try {
                    findSessions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldMergeSessions(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("mel", "first", 0L));
        Assertions.assertTrue(this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.processor.process(new Record("mel", "second", 300001L));
        Assertions.assertTrue(this.sessionStore.findSessions("mel", 300001L, 300001L).hasNext());
        Assertions.assertTrue(this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.processor.process(new Record("mel", "third", 150000L));
        KeyValueIterator findSessions = this.sessionStore.findSessions("mel", 0L, 300001L);
        try {
            Assertions.assertEquals(3L, (Long) ((KeyValue) findSessions.next()).value);
            Assertions.assertFalse(findSessions.hasNext());
            if (findSessions != null) {
                findSessions.close();
            }
        } catch (Throwable th) {
            if (findSessions != null) {
                try {
                    findSessions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldUpdateSessionIfTheSameTime(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("mel", "first", 0L));
        this.processor.process(new Record("mel", "second", 0L));
        KeyValueIterator findSessions = this.sessionStore.findSessions("mel", 0L, 0L);
        try {
            Assertions.assertEquals(2L, (Long) ((KeyValue) findSessions.next()).value);
            Assertions.assertFalse(findSessions.hasNext());
            if (findSessions != null) {
                findSessions.close();
            }
        } catch (Throwable th) {
            if (findSessions != null) {
                try {
                    findSessions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("mel", "first", 0L));
        long j = 0 + 300001;
        this.processor.process(new Record("mel", "second", j));
        this.processor.process(new Record("mel", "second", j));
        long j2 = j + 300001;
        this.processor.process(new Record("mel", "third", j2));
        this.processor.process(new Record("mel", "third", j2));
        this.processor.process(new Record("mel", "third", j2));
        this.sessionStore.flush();
        if (this.emitFinal) {
            Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("mel", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("mel", new SessionWindow(300001L, 300001L)), new Change(2L, (Object) null), 300001L)), this.results);
        } else {
            Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("mel", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("mel", new SessionWindow(300001L, 300001L)), new Change(2L, (Object) null), 300001L), new KeyValueTimestamp(new Windowed("mel", new SessionWindow(j2, j2)), new Change(3L, (Object) null), j2)), this.results);
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldRemoveMergedSessionsFromStateStore(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("a", "1", 0L));
        KeyValueIterator findSessions = this.sessionStore.findSessions("a", 0L, 0L);
        try {
            Assertions.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), 1L), findSessions.next());
            if (findSessions != null) {
                findSessions.close();
            }
            this.processor.process(new Record("a", "2", 100L));
            findSessions = this.sessionStore.findSessions("a", 0L, 100L);
            try {
                Assertions.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 100L)), 2L), findSessions.next());
                Assertions.assertFalse(findSessions.hasNext());
                if (findSessions != null) {
                    findSessions.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldHandleMultipleSessionsAndMerging(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        this.processor.process(new Record("a", "1", 0L));
        this.processor.process(new Record("b", "1", 0L));
        this.processor.process(new Record("c", "1", 0L));
        this.processor.process(new Record("d", "1", 0L));
        this.processor.process(new Record("d", "2", 150000L));
        this.processor.process(new Record("a", "2", 300001L));
        this.processor.process(new Record("b", "2", 300001L));
        this.processor.process(new Record("a", "3", 450001L));
        this.processor.process(new Record("c", "3", 450001L));
        this.sessionStore.flush();
        if (this.emitFinal) {
            Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("b", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("c", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("d", new SessionWindow(0L, 150000L)), new Change(2L, (Object) null), 150000L)), this.results);
        } else {
            Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("b", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("c", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("d", new SessionWindow(0L, 150000L)), new Change(2L, (Object) null), 150000L), new KeyValueTimestamp(new Windowed("b", new SessionWindow(300001L, 300001L)), new Change(1L, (Object) null), 300001L), new KeyValueTimestamp(new Windowed("a", new SessionWindow(300001L, 450001L)), new Change(2L, (Object) null), 450001L), new KeyValueTimestamp(new Windowed("c", new SessionWindow(450001L, 450001L)), new Change(1L, (Object) null), 450001L)), this.results);
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldGetAggregatedValuesFromValueGetter(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        KTableValueGetter kTableValueGetter = this.sessionAggregator.view().get();
        kTableValueGetter.init(this.context);
        this.processor.process(new Record("a", "1", 0L));
        this.processor.process(new Record("a", "1", 300001L));
        this.processor.process(new Record("a", "2", 300001L));
        long longValue = ((Long) kTableValueGetter.get(new Windowed("a", new SessionWindow(0L, 0L))).value()).longValue();
        long longValue2 = ((Long) kTableValueGetter.get(new Windowed("a", new SessionWindow(300001L, 300001L))).value()).longValue();
        Assertions.assertEquals(1L, longValue);
        Assertions.assertEquals(2L, longValue2);
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        if (this.emitFinal) {
            return;
        }
        initStore(false);
        this.processor.init(this.context);
        this.processor.process(new Record("a", "1", 0L));
        this.processor.process(new Record("b", "1", 0L));
        this.processor.process(new Record("c", "1", 0L));
        Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("b", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("c", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L)), this.results);
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, true);
        if (this.emitFinal) {
            return;
        }
        initStore(false);
        this.processor.init(this.context);
        this.processor.process(new Record("a", "1", 0L));
        this.processor.process(new Record("a", "1", 5L));
        Assertions.assertEquals(Arrays.asList(new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null), 0L), new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 0L)), new Change((Object) null, (Object) null), 0L), new KeyValueTimestamp(new Windowed("a", new SessionWindow(0L, 5L)), new Change(2L, (Object) null), 5L)), this.results);
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, false);
        this.context.setRecordContext(new ProcessorRecordContext(-1L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);
        try {
            this.processor.process(new Record((Object) null, "1", 0L));
            MatcherAssert.assertThat((List) createAndRegister.getEvents().stream().filter(event -> {
                return event.getLevel().equals("WARN");
            }).map((v0) -> {
                return v0.getMessage();
            }).collect(Collectors.toList()), CoreMatchers.hasItem("Skipping record due to null key. topic=[topic] partition=[-3] offset=[-2]"));
            if (createAndRegister != null) {
                createAndRegister.close();
            }
            Assertions.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(this.context.metrics().metrics(), "dropped-records-total", "stream-task-metrics").metricValue());
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, false);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(0L)), STORE_NAME, EmitStrategy.onWindowUpdate(), this.initializer, this.aggregator, this.sessionMerger).get();
        processor.init(this.context);
        this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
        processor.process(new Record("dummy", "dummy", 0L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
        processor.process(new Record("OnTime1", "1", 0L));
        this.context.setRecordContext(new ProcessorRecordContext(11L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
        processor.process(new Record("dummy", "dummy", 11L));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);
        try {
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("Late1", "1", 0L));
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired window. topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]"));
            if (createAndRegister != null) {
                createAndRegister.close();
            }
            MetricName metricName = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
            MetricName metricName2 = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
            MatcherAssert.assertThat(((KafkaMetric) this.metrics.metrics().get(metricName)).metricValue(), CoreMatchers.is(Double.valueOf(1.0d)));
            MatcherAssert.assertThat((Double) ((KafkaMetric) this.metrics.metrics().get(metricName2)).metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(EmitStrategy.StrategyType.class)
    @ParameterizedTest
    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace(EmitStrategy.StrategyType strategyType) {
        setup(strategyType, false);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapAndGrace(Duration.ofMillis(10L), Duration.ofMillis(1L)), STORE_NAME, EmitStrategy.onWindowUpdate(), this.initializer, this.aggregator, this.sessionMerger).get();
        processor.init(this.context);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);
        try {
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("dummy", "dummy", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("OnTime1", "1", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(11L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("dummy", "dummy", 11L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("OnTime2", "1", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(12L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("dummy", "dummy", 12L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, AssignmentTestUtils.TOPIC_PREFIX, new RecordHeaders()));
            processor.process(new Record("Late1", "1", 0L));
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired window. topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]"));
            if (createAndRegister != null) {
                createAndRegister.close();
            }
            MetricName metricName = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
            MetricName metricName2 = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", this.threadId), Utils.mkEntry("task-id", "0_0")}));
            MatcherAssert.assertThat(((KafkaMetric) this.metrics.metrics().get(metricName)).metricValue(), CoreMatchers.is(Double.valueOf(1.0d)));
            MatcherAssert.assertThat((Double) ((KafkaMetric) this.metrics.metrics().get(metricName2)).metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
        } catch (Throwable th) {
            if (createAndRegister != null) {
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
