package org.apache.kafka.streams.processor.internals.metrics;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImplTest.class */
public class StreamsMetricsImplTest {
    private static final String SENSOR_PREFIX_DELIMITER = ".";
    private static final String SENSOR_NAME_DELIMITER = ".s.";
    private static final String SENSOR_NAME_1 = "sensor1";
    private static final String SENSOR_NAME_2 = "sensor2";
    private static final String INTERNAL_PREFIX = "internal";
    private static final String VERSION = "latest";
    private static final String CLIENT_ID = "test-client";
    private static final String THREAD_ID1 = "test-thread-1";
    private static final String TASK_ID2 = "test-task-2";
    private static final String NODE_ID1 = "test-node-1";
    private static final String NODE_ID2 = "test-node-2";
    private static final String TOPIC_ID1 = "test-topic-1";
    private static final String TOPIC_ID2 = "test-topic-2";
    private static final String METRIC_NAME1 = "test-metric1";
    private static final String METRIC_NAME2 = "test-metric2";
    private static final String SCOPE_NAME = "test-scope";
    private static final String STORE_ID_TAG = "-state-id";
    private static final String STORE_NAME2 = "store2";
    private static final String RECORD_CACHE_ID_TAG = "record-cache-id";
    private static final String ENTITY_NAME = "test-entity";
    private static final String OPERATION_NAME = "test-operation";
    private static final String CUSTOM_TAG_KEY1 = "test-key1";
    private static final String CUSTOM_TAG_VALUE1 = "test-value1";
    private static final String CUSTOM_TAG_KEY2 = "test-key2";
    private static final String CUSTOM_TAG_VALUE2 = "test-value2";
    private static final String DESCRIPTION1 = "description number one";
    private static final String DESCRIPTION2 = "description number two";
    private static final String DESCRIPTION3 = "description number three";
    private final Metrics metrics = new Metrics();
    private final Sensor sensor = this.metrics.sensor("dummy");
    private final String metricNamePrefix = "metric";
    private final String group = "group";
    private final Map<String, String> tags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("tag", "value")});
    private final Map<String, String> clientLevelTags = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client-id", CLIENT_ID)});
    private final MetricName metricName1 = new MetricName(METRIC_NAME1, "stream-metrics", DESCRIPTION1, this.clientLevelTags);
    private final MetricName metricName2 = new MetricName(METRIC_NAME1, "stream-metrics", DESCRIPTION2, this.clientLevelTags);
    private final MockTime time = new MockTime(0);
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, CLIENT_ID, VERSION, this.time);
    private static final String THREAD_ID_TAG = "thread-id";
    private static final String TASK_ID_TAG = "task-id";
    private static final String TASK_ID1 = "test-task-1";
    private static final String STORE_NAME1 = "store1";
    private static final Map<String, String> STORE_LEVEL_TAG_MAP = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(THREAD_ID_TAG, Thread.currentThread().getName()), Utils.mkEntry(TASK_ID_TAG, TASK_ID1), Utils.mkEntry("test-scope-state-id", STORE_NAME1)});
    private static final Sensor.RecordingLevel INFO_RECORDING_LEVEL = Sensor.RecordingLevel.INFO;
    private static final Gauge<String> VALUE_PROVIDER = (metricConfig, j) -> {
        return "mutable-value";
    };

    private static MetricConfig eqMetricConfig(MetricConfig metricConfig) {
        StringBuffer stringBuffer = new StringBuffer();
        ArgumentMatchers.argThat(metricConfig2 -> {
            if (metricConfig2 instanceof MetricConfig) {
                boolean z = (metricConfig2.quota() == metricConfig.quota() || metricConfig2.quota().equals(metricConfig.quota())) && metricConfig2.tags().equals(metricConfig.tags());
                if (metricConfig2.eventWindow() == metricConfig.eventWindow() && metricConfig2.recordLevel() == metricConfig.recordLevel() && z && metricConfig2.samples() == metricConfig.samples() && metricConfig2.timeWindowMs() == metricConfig.timeWindowMs()) {
                    return true;
                }
                stringBuffer.append("{ ");
                stringBuffer.append("eventWindow=");
                stringBuffer.append(metricConfig2.eventWindow());
                stringBuffer.append(", ");
                stringBuffer.append("recordLevel=");
                stringBuffer.append(metricConfig2.recordLevel());
                stringBuffer.append(", ");
                stringBuffer.append("quota=");
                stringBuffer.append(metricConfig2.quota().toString());
                stringBuffer.append(", ");
                stringBuffer.append("samples=");
                stringBuffer.append(metricConfig2.samples());
                stringBuffer.append(", ");
                stringBuffer.append("tags=");
                stringBuffer.append(metricConfig2.tags().toString());
                stringBuffer.append(", ");
                stringBuffer.append("timeWindowMs=");
                stringBuffer.append(metricConfig2.timeWindowMs());
                stringBuffer.append(" }");
            }
            stringBuffer.append("not a MetricConfig object");
            return false;
        });
        return null;
    }

    private ArgumentCaptor<String> addSensorsOnAllLevels(Metrics metrics, StreamsMetricsImpl streamsMetricsImpl) {
        ArgumentCaptor<String> forClass = ArgumentCaptor.forClass(String.class);
        Mockito.when(metrics.sensor((String) forClass.capture(), (Sensor.RecordingLevel) ArgumentMatchers.eq(INFO_RECORDING_LEVEL), new Sensor[0])).thenReturn(this.sensor);
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-metrics", DESCRIPTION1, this.clientLevelTags)).thenReturn(this.metricName1);
        Mockito.when(metrics.metricName(METRIC_NAME2, "stream-metrics", DESCRIPTION2, this.clientLevelTags)).thenReturn(this.metricName2);
        streamsMetricsImpl.addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, "value");
        streamsMetricsImpl.addClientLevelImmutableMetric(METRIC_NAME2, DESCRIPTION2, INFO_RECORDING_LEVEL, "value");
        streamsMetricsImpl.clientLevelSensor(SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.clientLevelSensor(SENSOR_NAME_2, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.threadLevelSensor(THREAD_ID1, SENSOR_NAME_2, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_2, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.addStoreLevelMutableMetric(TASK_ID1, SCOPE_NAME, STORE_NAME1, METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, VALUE_PROVIDER);
        streamsMetricsImpl.addStoreLevelMutableMetric(TASK_ID1, SCOPE_NAME, STORE_NAME1, METRIC_NAME2, DESCRIPTION2, INFO_RECORDING_LEVEL, VALUE_PROVIDER);
        streamsMetricsImpl.addStoreLevelMutableMetric(TASK_ID1, SCOPE_NAME, STORE_NAME2, METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, VALUE_PROVIDER);
        return forClass;
    }

    private ArgumentCaptor<String> setupGetNewSensorTest(Metrics metrics, Sensor.RecordingLevel recordingLevel) {
        ArgumentCaptor<String> forClass = ArgumentCaptor.forClass(String.class);
        Mockito.when(metrics.getSensor((String) forClass.capture())).thenReturn((Object) null);
        Mockito.when(metrics.sensor((String) forClass.capture(), (Sensor.RecordingLevel) ArgumentMatchers.eq(INFO_RECORDING_LEVEL), new Sensor[0])).thenReturn(this.sensor);
        return forClass;
    }

    private void setupGetExistingSensorTest(Metrics metrics) {
        Mockito.when(metrics.getSensor(ArgumentMatchers.anyString())).thenReturn(this.sensor);
    }

    @Test
    public void shouldGetNewThreadLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingThreadLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).threadLevelSensor(THREAD_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewTaskLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingTaskLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).taskLevelSensor(THREAD_ID1, TASK_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewTopicLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).topicLevelSensor(THREAD_ID1, TASK_ID1, NODE_ID1, TOPIC_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingTopicLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).topicLevelSensor(THREAD_ID1, TASK_ID1, NODE_ID1, TOPIC_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewStoreLevelSensorIfNoneExists() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        ArgumentCaptor<String> argumentCaptor = setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
        MatcherAssert.assertThat((String) argumentCaptor.getAllValues().get(0), CoreMatchers.is((String) argumentCaptor.getAllValues().get(1)));
    }

    @Test
    public void shouldGetExistingStoreLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldUseSameStoreLevelSensorKeyWithTwoDifferentSensorNames() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        ArgumentCaptor<String> upSensorKeyTests = setUpSensorKeyTests(metrics);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_2, INFO_RECORDING_LEVEL, new Sensor[0]);
        MatcherAssert.assertThat((String) upSensorKeyTests.getAllValues().get(0), CoreMatchers.not((String) upSensorKeyTests.getAllValues().get(1)));
    }

    @Test
    public void shouldNotUseSameStoreLevelSensorKeyWithDifferentTaskIds() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        ArgumentCaptor<String> upSensorKeyTests = setUpSensorKeyTests(metrics);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID2, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        MatcherAssert.assertThat((String) upSensorKeyTests.getAllValues().get(0), CoreMatchers.not((String) upSensorKeyTests.getAllValues().get(1)));
    }

    @Test
    public void shouldNotUseSameStoreLevelSensorKeyWithDifferentStoreNames() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        ArgumentCaptor<String> upSensorKeyTests = setUpSensorKeyTests(metrics);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME2, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        MatcherAssert.assertThat((String) upSensorKeyTests.getAllValues().get(0), CoreMatchers.not((String) upSensorKeyTests.getAllValues().get(1)));
    }

    @Test
    public void shouldNotUseSameStoreLevelSensorKeyWithDifferentThreadIds() throws InterruptedException {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        ArgumentCaptor<String> upSensorKeyTests = setUpSensorKeyTests(metrics);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        Thread thread = new Thread(() -> {
            streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        });
        thread.start();
        thread.join();
        MatcherAssert.assertThat((String) upSensorKeyTests.getAllValues().get(0), CoreMatchers.not((String) upSensorKeyTests.getAllValues().get(1)));
    }

    @Test
    public void shouldUseSameStoreLevelSensorKeyWithSameSensorNames() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        ArgumentCaptor<String> upSensorKeyTests = setUpSensorKeyTests(metrics);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        streamsMetricsImpl.storeLevelSensor(TASK_ID1, STORE_NAME1, SENSOR_NAME_1, INFO_RECORDING_LEVEL, new Sensor[0]);
        MatcherAssert.assertThat((String) upSensorKeyTests.getAllValues().get(0), CoreMatchers.is((String) upSensorKeyTests.getAllValues().get(1)));
    }

    private ArgumentCaptor<String> setUpSensorKeyTests(Metrics metrics) {
        ArgumentCaptor<String> forClass = ArgumentCaptor.forClass(String.class);
        Mockito.when(metrics.getSensor((String) forClass.capture())).thenReturn(this.sensor);
        return forClass;
    }

    @Test
    public void shouldAddNewStoreLevelMutableMetric() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        MetricConfig recordLevel = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP)).thenReturn(metricName);
        Mockito.when(metrics.metric(metricName)).thenReturn((Object) null);
        Mockito.when(metrics.addMetricIfAbsent((MetricName) ArgumentMatchers.eq(metricName), eqMetricConfig(recordLevel), (MetricValueProvider) ArgumentMatchers.eq(VALUE_PROVIDER))).thenReturn((Object) null);
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).addStoreLevelMutableMetric(TASK_ID1, SCOPE_NAME, STORE_NAME1, METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, VALUE_PROVIDER);
    }

    @Test
    public void shouldCreateNewStoreLevelMutableMetric() {
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        MetricConfig recordLevel = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
        Metrics metrics = new Metrics(recordLevel);
        Assertions.assertNull(metrics.metric(metricName));
        metrics.addMetricIfAbsent(metricName, recordLevel, VALUE_PROVIDER);
        Assertions.assertNotNull(metrics.metric(metricName));
    }

    @Test
    public void shouldNotAddStoreLevelMutableMetricIfAlreadyExists() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP)).thenReturn(metricName);
        Mockito.when(metrics.metric(metricName)).thenReturn((Object) null);
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).addStoreLevelMutableMetric(TASK_ID1, SCOPE_NAME, STORE_NAME1, METRIC_NAME1, DESCRIPTION1, INFO_RECORDING_LEVEL, VALUE_PROVIDER);
    }

    @Test
    public void shouldReturnSameMetricIfAlreadyCreated() {
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        MetricConfig recordLevel = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
        Metrics metrics = new Metrics(recordLevel);
        Assertions.assertNull(metrics.metric(metricName));
        Assertions.assertEquals(metrics.addMetricIfAbsent(metricName, recordLevel, VALUE_PROVIDER), metrics.addMetricIfAbsent(metricName, recordLevel, VALUE_PROVIDER));
    }

    @Test
    public void shouldCreateMetricOnceDuringConcurrentMetricCreationRequest() throws InterruptedException {
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        MetricConfig recordLevel = new MetricConfig().recordLevel(INFO_RECORDING_LEVEL);
        Metrics metrics = new Metrics(recordLevel);
        Assertions.assertNull(metrics.metric(metricName));
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        Thread thread = new Thread(() -> {
            atomicReference.set(metrics.addMetricIfAbsent(metricName, recordLevel, VALUE_PROVIDER));
        });
        Thread thread2 = new Thread(() -> {
            atomicReference2.set(metrics.addMetricIfAbsent(metricName, recordLevel, VALUE_PROVIDER));
        });
        thread.start();
        thread2.start();
        thread.join();
        thread2.join();
        Assertions.assertEquals(atomicReference.get(), atomicReference2.get());
    }

    @Test
    public void shouldRemoveStateStoreLevelSensors() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        MetricName metricName = new MetricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP);
        MetricName metricName2 = new MetricName(METRIC_NAME2, "stream-state-metrics", DESCRIPTION2, STORE_LEVEL_TAG_MAP);
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-state-metrics", DESCRIPTION1, STORE_LEVEL_TAG_MAP)).thenReturn(metricName);
        Mockito.when(metrics.metricName(METRIC_NAME2, "stream-state-metrics", DESCRIPTION2, STORE_LEVEL_TAG_MAP)).thenReturn(metricName2);
        ArgumentCaptor<String> addSensorsOnAllLevels = addSensorsOnAllLevels(metrics, streamsMetricsImpl);
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor((String) addSensorsOnAllLevels.getAllValues().get(6));
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor((String) addSensorsOnAllLevels.getAllValues().get(7));
        Mockito.when(metrics.removeMetric(metricName)).thenReturn((Object) null);
        Mockito.when(metrics.removeMetric(metricName2)).thenReturn((Object) null);
        streamsMetricsImpl.removeAllStoreLevelSensorsAndMetrics(TASK_ID1, STORE_NAME1);
    }

    @Test
    public void shouldGetNewNodeLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).nodeLevelSensor(THREAD_ID1, TASK_ID1, NODE_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingNodeLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).nodeLevelSensor(THREAD_ID1, TASK_ID1, NODE_ID1, SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewCacheLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).cacheLevelSensor(THREAD_ID1, TASK_ID1, "processorNodeName", SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingCacheLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).cacheLevelSensor(THREAD_ID1, TASK_ID1, "processorNodeName", SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetNewClientLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetNewSensorTest(metrics, recordingLevel);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).clientLevelSensor(SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldGetExistingClientLevelSensor() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        setupGetExistingSensorTest(metrics);
        MatcherAssert.assertThat(new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).clientLevelSensor(SENSOR_NAME_1, recordingLevel, new Sensor[0]), CoreMatchers.is(CoreMatchers.equalToObject(this.sensor)));
    }

    @Test
    public void shouldAddClientLevelImmutableMetric() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        StreamsMetricsImpl.ImmutableMetricValue immutableMetricValue = new StreamsMetricsImpl.ImmutableMetricValue("immutable-value");
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-metrics", DESCRIPTION1, this.clientLevelTags)).thenReturn(this.metricName1);
        ((Metrics) Mockito.doNothing().when(metrics)).addMetric((MetricName) ArgumentMatchers.eq(this.metricName1), eqMetricConfig(recordLevel), (MetricValueProvider) ArgumentMatchers.eq(immutableMetricValue));
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).addClientLevelImmutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, "immutable-value");
    }

    @Test
    public void shouldAddClientLevelMutableMetric() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        Sensor.RecordingLevel recordingLevel = Sensor.RecordingLevel.INFO;
        MetricConfig recordLevel = new MetricConfig().recordLevel(recordingLevel);
        Gauge gauge = (metricConfig, j) -> {
            return "mutable-value";
        };
        Mockito.when(metrics.metricName(METRIC_NAME1, "stream-metrics", DESCRIPTION1, this.clientLevelTags)).thenReturn(this.metricName1);
        ((Metrics) Mockito.doNothing().when(metrics)).addMetric((MetricName) ArgumentMatchers.eq(this.metricName1), eqMetricConfig(recordLevel), (MetricValueProvider) ArgumentMatchers.eq(gauge));
        new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time).addClientLevelMutableMetric(METRIC_NAME1, DESCRIPTION1, recordingLevel, gauge);
    }

    @Test
    public void shouldProvideCorrectStrings() {
        MatcherAssert.assertThat("-latency", CoreMatchers.is("-latency"));
        MatcherAssert.assertThat("all", CoreMatchers.is("all"));
    }

    private void setupRemoveSensorsTest(Metrics metrics, String str) {
        String str2 = "internal." + str + SENSOR_NAME_DELIMITER;
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor(str2 + SENSOR_NAME_1);
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor(str2 + SENSOR_NAME_2);
    }

    @Test
    public void shouldRemoveClientLevelMetricsAndSensors() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        ArgumentCaptor<String> addSensorsOnAllLevels = addSensorsOnAllLevels(metrics, streamsMetricsImpl);
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor((String) addSensorsOnAllLevels.getAllValues().get(0));
        ((Metrics) Mockito.doNothing().when(metrics)).removeSensor((String) addSensorsOnAllLevels.getAllValues().get(1));
        Mockito.when(metrics.removeMetric(this.metricName1)).thenReturn((Object) null);
        Mockito.when(metrics.removeMetric(this.metricName2)).thenReturn((Object) null);
        streamsMetricsImpl.removeAllClientLevelSensorsAndMetrics();
    }

    @Test
    public void shouldRemoveThreadLevelSensors() {
        Metrics metrics = (Metrics) Mockito.mock(Metrics.class);
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, CLIENT_ID, VERSION, this.time);
        addSensorsOnAllLevels(metrics, streamsMetricsImpl);
        setupRemoveSensorsTest(metrics, THREAD_ID1);
        streamsMetricsImpl.removeAllThreadLevelSensors(THREAD_ID1);
    }

    @Test
    public void testNullMetrics() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsMetricsImpl((Metrics) null, "", VERSION, this.time);
        });
    }

    @Test
    public void testRemoveNullSensor() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.streamsMetrics.removeSensor((Sensor) null);
        });
    }

    @Test
    public void testRemoveSensor() {
        Sensor addSensor = this.streamsMetrics.addSensor(SENSOR_NAME_1, Sensor.RecordingLevel.DEBUG);
        this.streamsMetrics.removeSensor(addSensor);
        this.streamsMetrics.removeSensor(this.streamsMetrics.addSensor(SENSOR_NAME_1, Sensor.RecordingLevel.DEBUG, new Sensor[]{addSensor}));
        this.streamsMetrics.removeSensor(this.streamsMetrics.addLatencyRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]));
        this.streamsMetrics.removeSensor(this.streamsMetrics.addRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]));
        Assertions.assertEquals(Collections.emptyMap(), this.streamsMetrics.parentSensors());
    }

    @Test
    public void testMultiLevelSensorRemoval() {
        Metrics metrics = new Metrics();
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(metrics, THREAD_ID1, VERSION, this.time);
        Iterator it = metrics.metrics().keySet().iterator();
        while (it.hasNext()) {
            metrics.removeMetric((MetricName) it.next());
        }
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("tkey", "value")});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("nkey", "value")});
        Map mkMap3 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry("tkey", "value")});
        Sensor taskLevelSensor = streamsMetricsImpl.taskLevelSensor(THREAD_ID1, "taskName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[0]);
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(taskLevelSensor, "stream-processor-node-metrics", mkMap, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(taskLevelSensor, "stream-processor-node-metrics", mkMap, "operation", "", "");
        int size = metrics.metrics().size();
        Sensor nodeLevelSensor = streamsMetricsImpl.nodeLevelSensor(THREAD_ID1, "taskName", "processorNodeName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[]{taskLevelSensor});
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(nodeLevelSensor, "stream-processor-node-metrics", mkMap2, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(nodeLevelSensor, "stream-processor-node-metrics", mkMap2, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.greaterThan(Integer.valueOf(size)));
        int size2 = metrics.metrics().size();
        Sensor sensor = streamsMetricsImpl.topicLevelSensor(THREAD_ID1, "taskName", "processorNodeName", "topicName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[]{nodeLevelSensor});
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(sensor, "stream-topic-metrics", mkMap3, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(sensor, "stream-topic-metrics", mkMap3, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.greaterThan(Integer.valueOf(size2)));
        streamsMetricsImpl.removeAllTopicLevelSensors(THREAD_ID1, "taskName", "processorNodeName", "topicName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size2)));
        streamsMetricsImpl.removeAllNodeLevelSensors(THREAD_ID1, "taskName", "processorNodeName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        Sensor taskLevelSensor2 = streamsMetricsImpl.taskLevelSensor(THREAD_ID1, "taskName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[0]);
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(taskLevelSensor2, "stream-processor-node-metrics", mkMap, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(taskLevelSensor2, "stream-processor-node-metrics", mkMap, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        Sensor nodeLevelSensor2 = streamsMetricsImpl.nodeLevelSensor(THREAD_ID1, "taskName", "processorNodeName", "operation", Sensor.RecordingLevel.DEBUG, new Sensor[]{taskLevelSensor2});
        StreamsMetricsImpl.addAvgAndMaxLatencyToSensor(nodeLevelSensor2, "stream-processor-node-metrics", mkMap2, "operation");
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(nodeLevelSensor2, "stream-processor-node-metrics", mkMap2, "operation", "", "");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.greaterThan(Integer.valueOf(size)));
        streamsMetricsImpl.removeAllNodeLevelSensors(THREAD_ID1, "taskName", "processorNodeName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(Integer.valueOf(size)));
        streamsMetricsImpl.removeAllTaskLevelSensors(THREAD_ID1, "taskName");
        MatcherAssert.assertThat(Integer.valueOf(metrics.metrics().size()), Matchers.equalTo(0));
    }

    @Test
    public void testLatencyMetrics() {
        int size = this.streamsMetrics.metrics().size();
        Sensor addLatencyRateTotalSensor = this.streamsMetrics.addLatencyRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]);
        Assertions.assertEquals(size + 2 + 2, this.streamsMetrics.metrics().size());
        this.streamsMetrics.removeSensor(addLatencyRateTotalSensor);
        Assertions.assertEquals(size, this.streamsMetrics.metrics().size());
    }

    @Test
    public void testThroughputMetrics() {
        int size = this.streamsMetrics.metrics().size();
        Sensor addRateTotalSensor = this.streamsMetrics.addRateTotalSensor("scope", "entity", "put", Sensor.RecordingLevel.DEBUG, new String[0]);
        Assertions.assertEquals(size + 2, this.streamsMetrics.metrics().size());
        this.streamsMetrics.removeSensor(addRateTotalSensor);
        Assertions.assertEquals(size, this.streamsMetrics.metrics().size());
    }

    @Test
    public void testTotalMetricDoesntDecrease() {
        MockTime mockTime = new MockTime(1L);
        MetricConfig timeWindow = new MetricConfig().timeWindow(1L, TimeUnit.MILLISECONDS);
        Metrics metrics = new Metrics(timeWindow, mockTime);
        Sensor addLatencyRateTotalSensor = new StreamsMetricsImpl(metrics, "", VERSION, mockTime).addLatencyRateTotalSensor("scope", "entity", "op", Sensor.RecordingLevel.INFO, new String[0]);
        KafkaMetric metric = metrics.metric(metrics.metricName("op-total", "stream-scope-metrics", "", new String[]{THREAD_ID_TAG, Thread.currentThread().getName(), "scope-id", "entity"}));
        for (int i = 0; i < 10; i++) {
            Assertions.assertEquals(i, Math.round(metric.measurable().measure(timeWindow, mockTime.milliseconds())));
            addLatencyRateTotalSensor.record(100.0d, mockTime.milliseconds());
        }
    }

    @Test
    public void shouldAddLatencyRateTotalSensor() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, VERSION, this.time);
        shouldAddCustomSensor(streamsMetricsImpl.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[0]), streamsMetricsImpl, Arrays.asList("test-operation-latency-avg", "test-operation-latency-max", "test-operation-total", "test-operation-rate"));
    }

    @Test
    public void shouldAddRateTotalSensor() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, CLIENT_ID, VERSION, this.time);
        shouldAddCustomSensor(streamsMetricsImpl.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[0]), streamsMetricsImpl, Arrays.asList("test-operation-total", "test-operation-rate"));
    }

    @Test
    public void shouldAddLatencyRateTotalSensorWithCustomTags() {
        shouldAddCustomSensorWithTags(this.streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1, CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2}), Arrays.asList("test-operation-latency-avg", "test-operation-latency-max", "test-operation-total", "test-operation-rate"), customTags(this.streamsMetrics));
    }

    @Test
    public void shouldAddRateTotalSensorWithCustomTags() {
        shouldAddCustomSensorWithTags(this.streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1, CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2}), Arrays.asList("test-operation-total", "test-operation-rate"), customTags(this.streamsMetrics));
    }

    private void shouldAddCustomSensor(Sensor sensor, StreamsMetricsImpl streamsMetricsImpl, List<String> list) {
        shouldAddCustomSensorWithTags(sensor, list, tags(streamsMetricsImpl));
    }

    private void shouldAddCustomSensorWithTags(Sensor sensor, List<String> list, Map<String, String> map) {
        Assertions.assertTrue(sensor.hasMetrics());
        MatcherAssert.assertThat(sensor.name(), CoreMatchers.is("external." + Thread.currentThread().getName() + ".entity." + ENTITY_NAME + SENSOR_NAME_DELIMITER + OPERATION_NAME));
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertTrue(StreamsTestUtils.containsMetric(this.metrics, it.next(), "stream-test-scope-metrics", map));
        }
    }

    private Map<String, String> tags(StreamsMetricsImpl streamsMetricsImpl) {
        Map.Entry[] entryArr = new Map.Entry[2];
        entryArr[0] = Utils.mkEntry(streamsMetricsImpl.version() == StreamsMetricsImpl.Version.LATEST ? THREAD_ID_TAG : "client-id", Thread.currentThread().getName());
        entryArr[1] = Utils.mkEntry("test-scope-id", ENTITY_NAME);
        return Utils.mkMap(entryArr);
    }

    private Map<String, String> customTags(StreamsMetricsImpl streamsMetricsImpl) {
        Map<String, String> tags = tags(streamsMetricsImpl);
        tags.put(CUSTOM_TAG_KEY1, CUSTOM_TAG_VALUE1);
        tags.put(CUSTOM_TAG_KEY2, CUSTOM_TAG_VALUE2);
        return tags;
    }

    @Test
    public void shouldThrowIfLatencyRateTotalSensorIsAddedWithOddTags() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.streamsMetrics.addLatencyRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{"bad-tag"});
        })).getMessage(), CoreMatchers.is("Tags needs to be specified in key-value pairs"));
    }

    @Test
    public void shouldThrowIfRateTotalSensorIsAddedWithOddTags() {
        MatcherAssert.assertThat(((IllegalArgumentException) Assertions.assertThrows(IllegalArgumentException.class, () -> {
            this.streamsMetrics.addRateTotalSensor(SCOPE_NAME, ENTITY_NAME, OPERATION_NAME, Sensor.RecordingLevel.DEBUG, new String[]{"bad-tag"});
        })).getMessage(), CoreMatchers.is("Tags needs to be specified in key-value pairs"));
    }

    @Test
    public void shouldGetClientLevelTagMap() {
        Map clientLevelTagMap = this.streamsMetrics.clientLevelTagMap();
        MatcherAssert.assertThat(Integer.valueOf(clientLevelTagMap.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((String) clientLevelTagMap.get("client-id"), Matchers.equalTo(CLIENT_ID));
    }

    @Test
    public void shouldGetStoreLevelTagMap() {
        Map storeLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).storeLevelTagMap("test-task", "remote-window", "window-keeper");
        MatcherAssert.assertThat(Integer.valueOf(storeLevelTagMap.size()), Matchers.equalTo(3));
        MatcherAssert.assertThat((String) storeLevelTagMap.get(THREAD_ID_TAG), Matchers.equalTo(Thread.currentThread().getName()));
        MatcherAssert.assertThat((String) storeLevelTagMap.get(TASK_ID_TAG), Matchers.equalTo("test-task"));
        MatcherAssert.assertThat((String) storeLevelTagMap.get("remote-window-state-id"), Matchers.equalTo("window-keeper"));
    }

    @Test
    public void shouldGetCacheLevelTagMap() {
        Map cacheLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).cacheLevelTagMap(THREAD_ID1, "taskName", "storeName");
        MatcherAssert.assertThat(Integer.valueOf(cacheLevelTagMap.size()), Matchers.equalTo(3));
        MatcherAssert.assertThat((String) cacheLevelTagMap.get(THREAD_ID_TAG), Matchers.equalTo(THREAD_ID1));
        MatcherAssert.assertThat((String) cacheLevelTagMap.get(TASK_ID_TAG), Matchers.equalTo("taskName"));
        MatcherAssert.assertThat((String) cacheLevelTagMap.get(RECORD_CACHE_ID_TAG), Matchers.equalTo("storeName"));
    }

    @Test
    public void shouldGetThreadLevelTagMap() {
        Map threadLevelTagMap = new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).threadLevelTagMap(THREAD_ID1);
        MatcherAssert.assertThat(Integer.valueOf(threadLevelTagMap.size()), Matchers.equalTo(1));
        MatcherAssert.assertThat((String) threadLevelTagMap.get(THREAD_ID_TAG), Matchers.equalTo(THREAD_ID1));
    }

    @Test
    public void shouldAddInvocationRateToSensor() {
        Sensor sensor = (Sensor) Mockito.mock(Sensor.class);
        Mockito.when(Boolean.valueOf(sensor.add((MetricName) ArgumentMatchers.eq(new MetricName("test-metric1-rate", "group", DESCRIPTION1, this.tags)), (MeasurableStat) ArgumentMatchers.any(Rate.class)))).thenReturn(true);
        StreamsMetricsImpl.addInvocationRateToSensor(sensor, "group", this.tags, METRIC_NAME1, DESCRIPTION1);
    }

    @Test
    public void shouldAddAmountRateAndSum() {
        StreamsMetricsImpl.addRateOfSumAndSumMetricsToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1, DESCRIPTION2);
        verifyMetric("metric-rate", DESCRIPTION1, 18.0d, 72.0d, 90.0d / Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds());
        verifyMetric("metric-total", DESCRIPTION2, 18.0d, 72.0d, 180.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldAddSum() {
        StreamsMetricsImpl.addSumMetricToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1);
        verifyMetric("metric-total", DESCRIPTION1, 18.0d, 42.0d, 60.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddAmountRate() {
        StreamsMetricsImpl.addRateOfSumMetricToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1);
        verifyMetric("metric-rate", DESCRIPTION1, 18.0d, 72.0d, 90.0d / Duration.ofMillis(new MetricConfig().timeWindowMs()).getSeconds());
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddValue() {
        StreamsMetricsImpl.addValueMetricToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1);
        KafkaMetric metric = this.metrics.metric(new MetricName("metric", "group", DESCRIPTION1, this.tags));
        MatcherAssert.assertThat(metric, CoreMatchers.is(CoreMatchers.notNullValue()));
        MetricConfig metricConfig = new MetricConfig();
        this.sensor.record(42.0d);
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(metricConfig, this.time.milliseconds())), Matchers.equalTo(Double.valueOf(42.0d)));
        this.sensor.record(18.0d);
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(metricConfig, this.time.milliseconds())), Matchers.equalTo(Double.valueOf(18.0d)));
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(2));
    }

    @Test
    public void shouldAddTotalCountAndSumMetricsToSensor() {
        StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(this.sensor, "group", this.tags, "total", "count", DESCRIPTION1, DESCRIPTION2);
        verifyMetric("total-total", DESCRIPTION1, 18.0d, 42.0d, 2.0d);
        verifyMetric("count-total", DESCRIPTION2, 18.0d, 42.0d, 120.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldAddAvgAndTotalMetricsToSensor() {
        StreamsMetricsImpl.addAvgAndSumMetricsToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1, DESCRIPTION2);
        verifyMetric("metric-avg", DESCRIPTION1, 18.0d, 42.0d, 30.0d);
        verifyMetric("metric-total", DESCRIPTION2, 18.0d, 42.0d, 120.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldAddAvgAndMinAndMaxMetricsToSensor() {
        StreamsMetricsImpl.addAvgAndMinAndMaxToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1, DESCRIPTION2, DESCRIPTION3);
        verifyMetric("metric-avg", DESCRIPTION1, 18.0d, 42.0d, 30.0d);
        verifyMetric("metric-min", DESCRIPTION2, 18.0d, 42.0d, 18.0d);
        verifyMetric("metric-max", DESCRIPTION3, 18.0d, 42.0d, 42.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(4));
    }

    @Test
    public void shouldAddMinAndMaxMetricsToSensor() {
        StreamsMetricsImpl.addMinAndMaxToSensor(this.sensor, "group", this.tags, "metric", DESCRIPTION1, DESCRIPTION2);
        verifyMetric("metric-min", DESCRIPTION1, 18.0d, 42.0d, 18.0d);
        verifyMetric("metric-max", DESCRIPTION2, 18.0d, 42.0d, 42.0d);
        MatcherAssert.assertThat(Integer.valueOf(this.metrics.metrics().size()), Matchers.equalTo(3));
    }

    @Test
    public void shouldReturnMetricsVersionCurrent() {
        MatcherAssert.assertThat(new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).version(), Matchers.equalTo(StreamsMetricsImpl.Version.LATEST));
    }

    private void verifyMetric(String str, String str2, double d, double d2, double d3) {
        KafkaMetric metric = this.metrics.metric(new MetricName(str, "group", str2, this.tags));
        MatcherAssert.assertThat(metric, CoreMatchers.is(CoreMatchers.notNullValue()));
        MatcherAssert.assertThat(metric.metricName().description(), Matchers.equalTo(str2));
        this.sensor.record(d, this.time.milliseconds());
        this.sensor.record(d2, this.time.milliseconds());
        MatcherAssert.assertThat(Double.valueOf(metric.measurable().measure(new MetricConfig(), this.time.milliseconds())), Matchers.equalTo(Double.valueOf(d3)));
    }

    @Test
    public void shouldMeasureLatency() {
        Sensor sensor = (Sensor) Mockito.mock(Sensor.class);
        Mockito.when(Boolean.valueOf(sensor.shouldRecord())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sensor.hasMetrics())).thenReturn(true);
        ((Sensor) Mockito.doNothing().when(sensor)).record(4.0d);
        Time time = (Time) Mockito.mock(Time.class);
        Mockito.when(Long.valueOf(time.nanoseconds())).thenReturn(6L).thenReturn(10L);
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, time, sensor);
    }

    @Test
    public void shouldNotMeasureLatencyDueToRecordingLevel() {
        Sensor sensor = (Sensor) Mockito.mock(Sensor.class);
        Mockito.when(Boolean.valueOf(sensor.shouldRecord())).thenReturn(false);
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, (Time) Mockito.mock(Time.class), sensor);
    }

    @Test
    public void shouldNotMeasureLatencyBecauseSensorHasNoMetrics() {
        Sensor sensor = (Sensor) Mockito.mock(Sensor.class);
        Mockito.when(Boolean.valueOf(sensor.shouldRecord())).thenReturn(true);
        Mockito.when(Boolean.valueOf(sensor.hasMetrics())).thenReturn(false);
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
        }, (Time) Mockito.mock(Time.class), sensor);
    }

    @Test
    public void shouldAddThreadLevelMutableMetric() {
        new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).addThreadLevelMutableMetric("foobar", "test metric", "t1", (metricConfig, j) -> {
            return 123;
        });
        MetricName metricName = this.metrics.metricName("foobar", "stream-thread-metrics", Collections.singletonMap(THREAD_ID_TAG, "t1"));
        MatcherAssert.assertThat(this.metrics.metric(metricName), CoreMatchers.notNullValue());
        MatcherAssert.assertThat(this.metrics.metric(metricName).metricValue(), Matchers.equalTo(123));
    }

    @Test
    public void shouldCleanupThreadLevelMutableMetric() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time);
        streamsMetricsImpl.addThreadLevelMutableMetric("foobar", "test metric", "t1", (metricConfig, j) -> {
            return 123;
        });
        streamsMetricsImpl.removeAllThreadLevelMetrics("t1");
        MatcherAssert.assertThat(this.metrics.metric(this.metrics.metricName("foobar", "stream-thread-metrics", Collections.singletonMap(THREAD_ID_TAG, "t1"))), CoreMatchers.nullValue());
    }

    @Test
    public void shouldAddThreadLevelImmutableMetric() {
        new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time).addThreadLevelImmutableMetric("foobar", "test metric", "t1", 123);
        MetricName metricName = this.metrics.metricName("foobar", "stream-thread-metrics", Collections.singletonMap(THREAD_ID_TAG, "t1"));
        MatcherAssert.assertThat(this.metrics.metric(metricName), CoreMatchers.notNullValue());
        MatcherAssert.assertThat(this.metrics.metric(metricName).metricValue(), Matchers.equalTo(123));
    }

    @Test
    public void shouldCleanupThreadLevelImmutableMetric() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, THREAD_ID1, VERSION, this.time);
        streamsMetricsImpl.addThreadLevelImmutableMetric("foobar", "test metric", "t1", 123);
        streamsMetricsImpl.removeAllThreadLevelMetrics("t1");
        MatcherAssert.assertThat(this.metrics.metric(this.metrics.metricName("foobar", "stream-thread-metrics", Collections.singletonMap(THREAD_ID_TAG, "t1"))), CoreMatchers.nullValue());
    }
}
