package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.Objects;
import java.util.Properties;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.RocksDBConfigSetter;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.class */
public class AbstractProcessorContextTest {
    private final MockStreamsMetrics metrics = new MockStreamsMetrics(new Metrics());
    private final AbstractProcessorContext context = new TestProcessorContext(this.metrics);
    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
    private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
    private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", this.headers);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest$TestProcessorContext.class */
    private static class TestProcessorContext extends AbstractProcessorContext<Object, Object> {
        static Properties config = StreamsTestUtils.getStreamsConfig();

        TestProcessorContext(MockStreamsMetrics mockStreamsMetrics) {
            super(new TaskId(0, 0), new StreamsConfig(config), mockStreamsMetrics, new ThreadCache(new LogContext("name "), 0L, mockStreamsMetrics));
        }

        TestProcessorContext(MockStreamsMetrics mockStreamsMetrics, Properties properties) {
            super(new TaskId(0, 0), new StreamsConfig(properties), mockStreamsMetrics, new ThreadCache(new LogContext("name "), 0L, mockStreamsMetrics));
        }

        protected StateManager stateManager() {
            return new StateManagerStub();
        }

        public <S extends StateStore> S getStateStore(String str) {
            return null;
        }

        public Cancellable schedule(Duration duration, PunctuationType punctuationType, Punctuator punctuator) throws IllegalArgumentException {
            return null;
        }

        public <K, V> void forward(Record<K, V> record) {
        }

        public <K, V> void forward(Record<K, V> record, String str) {
        }

        public <K, V> void forward(K k, V v) {
        }

        public <K, V> void forward(K k, V v, To to) {
        }

        public void commit() {
        }

        public long currentStreamTimeMs() {
            throw new UnsupportedOperationException("this method is not supported in TestProcessorContext");
        }

        public void logChange(String str, Bytes bytes, byte[] bArr, long j, Position position) {
        }

        public void transitionToActive(StreamTask streamTask, RecordCollector recordCollector, ThreadCache threadCache) {
        }

        public void transitionToStandby(ThreadCache threadCache) {
        }

        public void registerCacheFlushListener(String str, ThreadCache.DirtyEntryFlushListener dirtyEntryFlushListener) {
        }

        public String changelogFor(String str) {
            return ProcessorStateManager.storeChangelogTopic(applicationId(), str, taskId().topologyName());
        }

        public <K, V> void forward(FixedKeyRecord<K, V> fixedKeyRecord) {
            forward(new Record<>(fixedKeyRecord.key(), fixedKeyRecord.value(), fixedKeyRecord.timestamp(), fixedKeyRecord.headers()));
        }

        public <K, V> void forward(FixedKeyRecord<K, V> fixedKeyRecord, String str) {
            forward((Record) new Record<>(fixedKeyRecord.key(), fixedKeyRecord.value(), fixedKeyRecord.timestamp(), fixedKeyRecord.headers()), str);
        }

        static {
            config.put("rocksdb.config.setter", RocksDBConfigSetter.class.getName());
            config.put("default.value.serde", Serdes.ByteArraySerde.class);
            config.put("default.key.serde", Serdes.ByteArraySerde.class);
            config.put("user.supplied.config", "user-supplied-value");
        }
    }

    @BeforeEach
    public void before() {
        this.context.setRecordContext(this.recordContext);
    }

    @Test
    public void shouldThrowIllegalStateExceptionOnRegisterWhenContextIsInitialized() {
        this.context.initialize();
        try {
            this.context.register(this.stateStore, (StateRestoreCallback) null);
            Assertions.fail("should throw illegal state exception when context already initialized");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldNotThrowIllegalStateExceptionOnRegisterWhenContextIsNotInitialized() {
        this.context.register(this.stateStore, (StateRestoreCallback) null);
    }

    @Test
    public void shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
        Assertions.assertThrows(NullPointerException.class, () -> {
            this.context.register((StateStore) null, (StateRestoreCallback) null);
        });
    }

    @Test
    public void shouldReturnNullTopicIfNoRecordContext() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        MatcherAssert.assertThat(this.context.topic(), Matchers.is(CoreMatchers.nullValue()));
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnTopicIfRecordContextTopicIsNull() {
        this.context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, (String) null, new RecordHeaders()));
        MatcherAssert.assertThat(this.context.topic(), CoreMatchers.nullValue());
    }

    @Test
    public void shouldReturnTopicFromRecordContext() {
        MatcherAssert.assertThat(this.context.topic(), CoreMatchers.equalTo(this.recordContext.topic()));
    }

    @Test
    public void shouldReturnNullIfTopicEqualsNonExistTopic() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        MatcherAssert.assertThat(this.context.topic(), CoreMatchers.nullValue());
    }

    @Test
    public void shouldReturnDummyPartitionIfNoRecordContext() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        MatcherAssert.assertThat(Integer.valueOf(this.context.partition()), Matchers.is(-1));
    }

    @Test
    public void shouldReturnPartitionFromRecordContext() {
        MatcherAssert.assertThat(Integer.valueOf(this.context.partition()), CoreMatchers.equalTo(Integer.valueOf(this.recordContext.partition())));
    }

    @Test
    public void shouldThrowIllegalStateExceptionOnOffsetIfNoRecordContext() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        try {
            this.context.offset();
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void shouldReturnOffsetFromRecordContext() {
        MatcherAssert.assertThat(Long.valueOf(this.context.offset()), CoreMatchers.equalTo(Long.valueOf(this.recordContext.offset())));
    }

    @Test
    public void shouldReturnDummyTimestampIfNoRecordContext() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        MatcherAssert.assertThat(Long.valueOf(this.context.timestamp()), Matchers.is(0L));
    }

    @Test
    public void shouldReturnTimestampFromRecordContext() {
        MatcherAssert.assertThat(Long.valueOf(this.context.timestamp()), CoreMatchers.equalTo(Long.valueOf(this.recordContext.timestamp())));
    }

    @Test
    public void shouldReturnHeadersFromRecordContext() {
        MatcherAssert.assertThat(this.context.headers(), CoreMatchers.equalTo(this.recordContext.headers()));
    }

    @Test
    public void shouldReturnEmptyHeadersIfHeadersAreNotSet() {
        this.context.setRecordContext((ProcessorRecordContext) null);
        MatcherAssert.assertThat(this.context.headers(), Matchers.is(Matchers.emptyIterable()));
    }

    @Test
    public void appConfigsShouldReturnParsedValues() {
        MatcherAssert.assertThat(this.context.appConfigs().get("rocksdb.config.setter"), CoreMatchers.equalTo(RocksDBConfigSetter.class));
    }

    @Test
    public void appConfigsShouldReturnUnrecognizedValues() {
        MatcherAssert.assertThat(this.context.appConfigs().get("user.supplied.config"), CoreMatchers.equalTo("user-supplied-value"));
    }

    @Test
    public void shouldThrowErrorIfSerdeDefaultNotSet() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("rocksdb.config.setter", RocksDBConfigSetter.class.getName());
        streamsConfig.put("user.supplied.config", "user-supplied-value");
        TestProcessorContext testProcessorContext = new TestProcessorContext(this.metrics, streamsConfig);
        Objects.requireNonNull(testProcessorContext);
        Assertions.assertThrows(ConfigException.class, testProcessorContext::keySerde);
        Objects.requireNonNull(testProcessorContext);
        Assertions.assertThrows(ConfigException.class, testProcessorContext::valueSerde);
    }
}
