package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.test.MockClientSupplier;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@ExtendWith({MockitoExtension.class})
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsProducerTest.class */
public class StreamsProducerTest {
    private static final double BUFFER_POOL_WAIT_TIME = 1.0d;
    private static final double FLUSH_TME = 2.0d;
    private static final double TXN_INIT_TIME = 3.0d;
    private static final double TXN_BEGIN_TIME = 4.0d;
    private static final double TXN_SEND_OFFSETS_TIME = 5.0d;
    private static final double TXN_COMMIT_TIME = 6.0d;
    private static final double TXN_ABORT_TIME = 7.0d;
    private static final double METADATA_WAIT_TIME = 8.0d;
    private StreamsProducer nonEosStreamsProducer;
    private MockProducer<byte[], byte[]> nonEosMockProducer;
    private StreamsProducer eosAlphaStreamsProducer;
    private MockProducer<byte[], byte[]> eosAlphaMockProducer;
    private StreamsProducer eosBetaStreamsProducer;
    private MockProducer<byte[], byte[]> eosBetaMockProducer;
    private final LogContext logContext = new LogContext("test ");
    private final String topic = AssignmentTestUtils.TOPIC_PREFIX;
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.singletonList(new PartitionInfo(AssignmentTestUtils.TOPIC_PREFIX, 0, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    private final StreamsConfig nonEosConfig = new StreamsConfig(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234")}));
    private final StreamsConfig eosAlphaConfig = new StreamsConfig(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234"), Utils.mkEntry("processing.guarantee", "exactly_once")}));
    private final StreamsConfig eosBetaConfig = new StreamsConfig(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234"), Utils.mkEntry("processing.guarantee", "exactly_once_v2")}));
    private final Time mockTime = (Time) Mockito.mock(Time.class);
    final Producer<byte[], byte[]> mockedProducer = (Producer) Mockito.mock(Producer.class);
    final KafkaClientSupplier clientSupplier = new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.StreamsProducerTest.1
        @Override // org.apache.kafka.test.MockClientSupplier
        public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
            return StreamsProducerTest.this.mockedProducer;
        }
    };
    final StreamsProducer streamsProducerWithMock = new StreamsProducer(this.nonEosConfig, "threadId", this.clientSupplier, (TaskId) null, (UUID) null, this.logContext, this.mockTime);
    final StreamsProducer eosAlphaStreamsProducerWithMock = new StreamsProducer(this.eosAlphaConfig, "threadId", this.clientSupplier, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final MockClientSupplier eosAlphaMockClientSupplier = new MockClientSupplier();
    private final MockClientSupplier eosBetaMockClientSupplier = new MockClientSupplier();
    private final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(AssignmentTestUtils.TOPIC_PREFIX, 0, 0L, new byte[0], new byte[0], new RecordHeaders());
    private final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0), new OffsetAndMetadata(0, (String) null))});

    @BeforeEach
    public void before() {
        this.mockClientSupplier.setCluster(this.cluster);
        this.nonEosStreamsProducer = new StreamsProducer(this.nonEosConfig, "threadId-StreamThread-0", this.mockClientSupplier, (TaskId) null, (UUID) null, this.logContext, this.mockTime);
        this.nonEosMockProducer = this.mockClientSupplier.producers.get(0);
        this.eosAlphaMockClientSupplier.setCluster(this.cluster);
        this.eosAlphaMockClientSupplier.setApplicationIdForProducer("appId");
        this.eosAlphaStreamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId-StreamThread-0", this.eosAlphaMockClientSupplier, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        this.eosAlphaStreamsProducer.initTransaction();
        this.eosAlphaMockProducer = this.eosAlphaMockClientSupplier.producers.get(0);
        this.eosBetaMockClientSupplier.setCluster(this.cluster);
        this.eosBetaMockClientSupplier.setApplicationIdForProducer("appId");
        this.eosBetaStreamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.eosBetaMockClientSupplier, (TaskId) null, UUID.randomUUID(), this.logContext, this.mockTime);
        this.eosBetaStreamsProducer.initTransaction();
        this.eosBetaMockProducer = this.eosBetaMockClientSupplier.producers.get(0);
        Mockito.when(Long.valueOf(this.mockTime.nanoseconds())).thenReturn(Long.valueOf(Time.SYSTEM.nanoseconds()));
    }

    @Test
    public void shouldResetTransactionInFlightOnClose() {
        this.eosBetaStreamsProducer.send(new ProducerRecord(AssignmentTestUtils.TOPIC_PREFIX, new byte[1]), (recordMetadata, exc) -> {
        });
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.transactionInFlight()), Matchers.is(true));
        this.eosBetaStreamsProducer.close();
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.transactionInFlight()), Matchers.is(false));
    }

    @Test
    public void shouldResetTransactionInFlightOnReset() {
        this.eosBetaStreamsProducer.send(new ProducerRecord(AssignmentTestUtils.TOPIC_PREFIX, new byte[1]), (recordMetadata, exc) -> {
        });
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.transactionInFlight()), Matchers.is(true));
        this.eosBetaStreamsProducer.resetProducer();
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.transactionInFlight()), Matchers.is(false));
    }

    @Test
    public void shouldCreateProducer() {
        MatcherAssert.assertThat(Integer.valueOf(this.mockClientSupplier.producers.size()), Matchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockClientSupplier.producers.size()), Matchers.is(1));
    }

    @Test
    public void shouldForwardCallToPartitionsFor() {
        List emptyList = Collections.emptyList();
        Mockito.when(this.mockedProducer.partitionsFor(AssignmentTestUtils.TOPIC_PREFIX)).thenReturn(emptyList);
        MatcherAssert.assertThat(this.streamsProducerWithMock.partitionsFor(AssignmentTestUtils.TOPIC_PREFIX), Matchers.sameInstance(emptyList));
    }

    @Test
    public void shouldForwardCallToFlush() {
        this.streamsProducerWithMock.flush();
        ((Producer) Mockito.verify(this.mockedProducer)).flush();
    }

    @Test
    public void shouldForwardCallToMetrics() {
        HashMap hashMap = new HashMap();
        Mockito.when(this.mockedProducer.metrics()).thenReturn(hashMap);
        Assertions.assertSame(hashMap, this.streamsProducerWithMock.metrics());
    }

    @Test
    public void shouldForwardCallToClose() {
        this.streamsProducerWithMock.close();
        ((Producer) Mockito.verify(this.mockedProducer)).close();
    }

    @Test
    public void shouldFailIfStreamsConfigIsNull() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer((StreamsConfig) null, "threadId", this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), this.logContext, this.mockTime);
        })).getMessage(), Matchers.is("config cannot be null"));
    }

    @Test
    public void shouldFailIfThreadIdIsNull() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer(this.nonEosConfig, (String) null, this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), this.logContext, this.mockTime);
        })).getMessage(), Matchers.is("threadId cannot be null"));
    }

    @Test
    public void shouldFailIfClientSupplierIsNull() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer(this.nonEosConfig, "threadId", (KafkaClientSupplier) null, new TaskId(0, 0), UUID.randomUUID(), this.logContext, this.mockTime);
        })).getMessage(), Matchers.is("clientSupplier cannot be null"));
    }

    @Test
    public void shouldFailIfLogContextIsNull() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer(this.nonEosConfig, "threadId", this.mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), (LogContext) null, this.mockTime);
        })).getMessage(), Matchers.is("logContext cannot be null"));
    }

    @Test
    public void shouldFailOnResetProducerForAtLeastOnce() {
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.nonEosStreamsProducer.resetProducer();
        })).getMessage(), Matchers.is("Expected eos-v2 to be enabled, but the processing mode was AT_LEAST_ONCE"));
    }

    @Test
    public void shouldFailOnResetProducerForExactlyOnceAlpha() {
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.eosAlphaStreamsProducer.resetProducer();
        })).getMessage(), Matchers.is("Expected eos-v2 to be enabled, but the processing mode was EXACTLY_ONCE_ALPHA"));
    }

    @Test
    public void shouldNotSetTransactionIdIfEosDisabled() {
        HashMap hashMap = new HashMap();
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        Mockito.when(streamsConfig.getProducerConfigs("threadId-producer")).thenReturn(hashMap);
        Mockito.when(streamsConfig.getString("processing.guarantee")).thenReturn("at_least_once");
        new StreamsProducer(streamsConfig, "threadId", this.mockClientSupplier, (TaskId) null, (UUID) null, this.logContext, this.mockTime);
        Assertions.assertFalse(hashMap.containsKey("transactional.id"));
    }

    @Test
    public void shouldNotHaveEosEnabledIfEosDisabled() {
        MatcherAssert.assertThat(Boolean.valueOf(this.nonEosStreamsProducer.eosEnabled()), Matchers.is(false));
    }

    @Test
    public void shouldNotInitTxIfEosDisable() {
        MatcherAssert.assertThat(Boolean.valueOf(this.nonEosMockProducer.transactionInitialized()), Matchers.is(false));
    }

    @Test
    public void shouldNotBeginTxOnSendIfEosDisable() {
        this.nonEosStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.nonEosMockProducer.transactionInFlight()), Matchers.is(false));
    }

    @Test
    public void shouldForwardRecordOnSend() {
        this.nonEosStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Integer.valueOf(this.nonEosMockProducer.history().size()), Matchers.is(1));
        MatcherAssert.assertThat((ProducerRecord) this.nonEosMockProducer.history().get(0), Matchers.is(this.record));
    }

    @Test
    public void shouldFailOnInitTxIfEosDisabled() {
        StreamsProducer streamsProducer = this.nonEosStreamsProducer;
        Objects.requireNonNull(streamsProducer);
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, streamsProducer::initTransaction)).getMessage(), Matchers.is("Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSendError() {
        this.nonEosMockProducer.sendException = new KafkaException("KABOOM!");
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.nonEosStreamsProducer.send(this.record, (Callback) null);
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.nonEosMockProducer.sendException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encountered trying to send record to topic topic [test]"));
    }

    @Test
    public void shouldFailOnSendFatal() {
        this.nonEosMockProducer.sendException = new RuntimeException("KABOOM!");
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.nonEosStreamsProducer.send(this.record, (Callback) null);
        })).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldFailOnCommitIfEosDisabled() {
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            this.nonEosStreamsProducer.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        })).getMessage(), Matchers.is("Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldFailOnAbortIfEosDisabled() {
        StreamsProducer streamsProducer = this.nonEosStreamsProducer;
        Objects.requireNonNull(streamsProducer);
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, streamsProducer::abortTransaction)).getMessage(), Matchers.is("Exactly-once is not enabled [test]"));
    }

    @Test
    public void shouldEnableEosIfEosAlphaEnabled() {
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaStreamsProducer.eosEnabled()), Matchers.is(true));
    }

    @Test
    public void shouldEnableEosIfEosBetaEnabled() {
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.eosEnabled()), Matchers.is(true));
    }

    @Test
    public void shouldSetTransactionIdUsingTaskIdIfEosAlphaEnabled() {
        HashMap hashMap = new HashMap();
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        Mockito.when(streamsConfig.getProducerConfigs("threadId-0_0-producer")).thenReturn(hashMap);
        Mockito.when(streamsConfig.getString("application.id")).thenReturn("appId");
        Mockito.when(streamsConfig.getString("processing.guarantee")).thenReturn("exactly_once");
        new StreamsProducer(streamsConfig, "threadId", this.eosAlphaMockClientSupplier, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        Assertions.assertEquals("appId-0_0", hashMap.get("transactional.id"));
    }

    @Test
    public void shouldSetTransactionIdUsingProcessIdIfEosV2Enabled() {
        UUID randomUUID = UUID.randomUUID();
        HashMap hashMap = new HashMap();
        StreamsConfig streamsConfig = (StreamsConfig) Mockito.mock(StreamsConfig.class);
        Mockito.when(streamsConfig.getProducerConfigs("threadId-StreamThread-0-producer")).thenReturn(hashMap);
        Mockito.when(streamsConfig.getString("application.id")).thenReturn("appId");
        Mockito.when(streamsConfig.getString("processing.guarantee")).thenReturn("exactly_once_v2");
        new StreamsProducer(streamsConfig, "threadId-StreamThread-0", this.eosAlphaMockClientSupplier, (TaskId) null, randomUUID, this.logContext, this.mockTime);
        Assertions.assertEquals("appId-" + randomUUID + "-0", hashMap.get("transactional.id"));
    }

    @Test
    public void shouldNotHaveEosEnabledIfEosAlphaEnable() {
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaStreamsProducer.eosEnabled()), Matchers.is(true));
    }

    @Test
    public void shouldHaveEosEnabledIfEosBetaEnabled() {
        MatcherAssert.assertThat(Boolean.valueOf(this.eosBetaStreamsProducer.eosEnabled()), Matchers.is(true));
    }

    @Test
    public void shouldInitTxOnEos() {
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInitialized()), Matchers.is(true));
    }

    @Test
    public void shouldBeginTxOnEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(true));
    }

    @Test
    public void shouldContinueTxnSecondEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockProducer.uncommittedRecords().size()), Matchers.is(2));
    }

    @Test
    public void shouldForwardRecordButNotCommitOnEosSend() {
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.history().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockProducer.uncommittedRecords().size()), Matchers.is(1));
        MatcherAssert.assertThat((ProducerRecord) this.eosAlphaMockProducer.uncommittedRecords().get(0), Matchers.is(this.record));
    }

    @Test
    public void shouldBeginTxOnEosCommit() {
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).initTransactions();
        ((Producer) Mockito.verify(this.mockedProducer)).beginTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).sendOffsetsToTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).commitTransaction();
    }

    @Test
    public void shouldSendOffsetToTxOnEosCommit() {
        this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.sentOffsets()), Matchers.is(true));
    }

    @Test
    public void shouldCommitTxOnEosCommit() {
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(true));
        this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.uncommittedRecords().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.uncommittedOffsets().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockProducer.history().size()), Matchers.is(1));
        MatcherAssert.assertThat((ProducerRecord) this.eosAlphaMockProducer.history().get(0), Matchers.is(this.record));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockProducer.consumerGroupOffsetsHistory().size()), Matchers.is(1));
        MatcherAssert.assertThat((Map) ((Map) this.eosAlphaMockProducer.consumerGroupOffsetsHistory().get(0)).get("appId"), Matchers.is(this.offsetsAndMetadata));
    }

    @Test
    public void shouldCommitTxWithApplicationIdOnEosAlphaCommit() {
        Mockito.when(this.mockedProducer.send(this.record, (Callback) null)).thenReturn((Object) null);
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.send(this.record, (Callback) null);
        this.eosAlphaStreamsProducerWithMock.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).initTransactions();
        ((Producer) Mockito.verify(this.mockedProducer)).beginTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).sendOffsetsToTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).commitTransaction();
    }

    @Test
    public void shouldCommitTxWithConsumerGroupMetadataOnEosBetaCommit() {
        Mockito.when(this.mockedProducer.send(this.record, (Callback) null)).thenReturn((Object) null);
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.clientSupplier, (TaskId) null, UUID.randomUUID(), this.logContext, this.mockTime);
        streamsProducer.initTransaction();
        streamsProducer.send(this.record, (Callback) null);
        streamsProducer.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).initTransactions();
        ((Producer) Mockito.verify(this.mockedProducer)).beginTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).sendOffsetsToTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        ((Producer) Mockito.verify(this.mockedProducer)).commitTransaction();
    }

    @Test
    public void shouldAbortTxOnEosAbort() {
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(true));
        MatcherAssert.assertThat(Integer.valueOf(this.eosAlphaMockProducer.uncommittedRecords().size()), Matchers.is(1));
        MatcherAssert.assertThat((ProducerRecord) this.eosAlphaMockProducer.uncommittedRecords().get(0), Matchers.is(this.record));
        this.eosAlphaStreamsProducer.abortTransaction();
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.transactionInFlight()), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.uncommittedRecords().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.uncommittedOffsets().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.history().isEmpty()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.consumerGroupOffsetsHistory().isEmpty()), Matchers.is(true));
    }

    @Test
    public void shouldSkipAbortTxOnEosAbortIfNotTxInFlight() {
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.abortTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).initTransactions();
    }

    @Test
    public void shouldFailIfTaskIdIsNullForEosAlpha() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer(this.eosAlphaConfig, "threadId", this.mockClientSupplier, (TaskId) null, UUID.randomUUID(), this.logContext, this.mockTime);
        })).getMessage(), Matchers.is("taskId cannot be null for exactly-once alpha"));
    }

    @Test
    public void shouldFailIfProcessIdNullForEosBeta() {
        MatcherAssert.assertThat(((NullPointerException) Assertions.assertThrows(NullPointerException.class, () -> {
            new StreamsProducer(this.eosBetaConfig, "threadId", this.mockClientSupplier, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        })).getMessage(), Matchers.is("processId cannot be null for exactly-once v2"));
    }

    @Test
    public void shouldThrowTimeoutExceptionOnEosInitTxTimeout() {
        this.nonEosMockProducer.initTransactionException = new TimeoutException("KABOOM!");
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.StreamsProducerTest.2
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        }, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        Objects.requireNonNull(streamsProducer);
        MatcherAssert.assertThat(Assertions.assertThrows(TimeoutException.class, streamsProducer::initTransaction).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceAlpha() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", this.eosAlphaMockClientSupplier, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            streamsProducer.send(this.record, (Callback) null);
        })).getMessage(), Matchers.is("MockProducer hasn't been initialized for transactions."));
    }

    @Test
    public void shouldFailOnMaybeBeginTransactionIfTransactionsNotInitializedForExactlyOnceBeta() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.eosBetaMockClientSupplier, (TaskId) null, UUID.randomUUID(), this.logContext, this.mockTime);
        MatcherAssert.assertThat(((IllegalStateException) Assertions.assertThrows(IllegalStateException.class, () -> {
            streamsProducer.send(this.record, (Callback) null);
        })).getMessage(), Matchers.is("MockProducer hasn't been initialized for transactions."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosInitError() {
        this.nonEosMockProducer.initTransactionException = new KafkaException("KABOOM!");
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.StreamsProducerTest.3
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        }, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        Objects.requireNonNull(streamsProducer);
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, streamsProducer::initTransaction);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.nonEosMockProducer.initTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encountered trying to initialize transactions [test]"));
    }

    @Test
    public void shouldFailOnEosInitFatal() {
        this.nonEosMockProducer.initTransactionException = new RuntimeException("KABOOM!");
        StreamsProducer streamsProducer = new StreamsProducer(this.eosAlphaConfig, "threadId", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.StreamsProducerTest.4
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return StreamsProducerTest.this.nonEosMockProducer;
            }
        }, new TaskId(0, 0), (UUID) null, this.logContext, this.mockTime);
        Objects.requireNonNull(streamsProducer);
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(RuntimeException.class, streamsProducer::initTransaction)).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosBeginTxnFenced() {
        this.eosAlphaMockProducer.fenceProducer();
        MatcherAssert.assertThat(Assertions.assertThrows(TaskMigratedException.class, () -> {
            this.eosAlphaStreamsProducer.send((ProducerRecord) null, (Callback) null);
        }).getMessage(), Matchers.is("Producer got fenced trying to begin a new transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosBeginTxnError() {
        this.eosAlphaMockProducer.beginTransactionException = new KafkaException("KABOOM!");
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.eosAlphaStreamsProducer.send((ProducerRecord) null, (Callback) null);
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.beginTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encountered trying to begin a new transaction [test]"));
    }

    @Test
    public void shouldFailOnEosBeginTxnFatal() {
        this.eosAlphaMockProducer.beginTransactionException = new RuntimeException("KABOOM!");
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.eosAlphaStreamsProducer.send((ProducerRecord) null, (Callback) null);
        })).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendProducerFenced() {
        testThrowTaskMigratedExceptionOnEosSend(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendPInvalidPidMapping() {
        testThrowTaskMigratedExceptionOnEosSend(new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendInvalidEpoch() {
        testThrowTaskMigratedExceptionOnEosSend(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnEosSend(RuntimeException runtimeException) {
        this.eosAlphaMockProducer.sendException = new KafkaException(runtimeException);
        TaskMigratedException assertThrows = Assertions.assertThrows(TaskMigratedException.class, () -> {
            this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(runtimeException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Producer got fenced trying to send a record [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosSendUnknownPid() {
        UnknownProducerIdException unknownProducerIdException = new UnknownProducerIdException("KABOOM!");
        this.eosAlphaMockProducer.sendException = new KafkaException(unknownProducerIdException);
        TaskMigratedException assertThrows = Assertions.assertThrows(TaskMigratedException.class, () -> {
            this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(unknownProducerIdException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Producer got fenced trying to send a record [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosSendOffsetProducerFenced() {
        testThrowTaskMigrateExceptionOnEosSendOffset(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidPidMapping() {
        testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigrateExceptionOnEosSendOffsetInvalidEpoch() {
        testThrowTaskMigrateExceptionOnEosSendOffset(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigrateExceptionOnEosSendOffset(RuntimeException runtimeException) {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = runtimeException;
        TaskMigratedException assertThrows = Assertions.assertThrows(TaskMigratedException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.sendOffsetsToTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Producer got fenced trying to commit a transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosSendOffsetError() {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = new KafkaException("KABOOM!");
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.sendOffsetsToTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encountered trying to commit a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosSendOffsetFatal() {
        this.eosAlphaMockProducer.sendOffsetsToTransactionException = new RuntimeException("KABOOM!");
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction((Map) null, new ConsumerGroupMetadata("appId"));
        })).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosCommitWithProducerFenced() {
        testThrowTaskMigratedExceptionOnEos(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidPidMapping() {
        testThrowTaskMigratedExceptionOnEos(new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnEosCommitWithInvalidEpoch() {
        testThrowTaskMigratedExceptionOnEos(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnEos(RuntimeException runtimeException) {
        this.eosAlphaMockProducer.commitTransactionException = runtimeException;
        TaskMigratedException assertThrows = Assertions.assertThrows(TaskMigratedException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        });
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.sentOffsets()), Matchers.is(true));
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.commitTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Producer got fenced trying to commit a transaction [test]; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosCommitTxError() {
        this.eosAlphaMockProducer.commitTransactionException = new KafkaException("KABOOM!");
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        });
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.sentOffsets()), Matchers.is(true));
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.commitTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encountered trying to commit a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosCommitTxFatal() {
        this.eosAlphaMockProducer.commitTransactionException = new RuntimeException("KABOOM!");
        RuntimeException runtimeException = (RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            this.eosAlphaStreamsProducer.commitTransaction(this.offsetsAndMetadata, new ConsumerGroupMetadata("appId"));
        });
        MatcherAssert.assertThat(Boolean.valueOf(this.eosAlphaMockProducer.sentOffsets()), Matchers.is(true));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldSwallowExceptionOnEosAbortTxProducerFenced() {
        testSwallowExceptionOnEosAbortTx(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldSwallowExceptionOnEosAbortTxInvalidPidMapping() {
        testSwallowExceptionOnEosAbortTx(new InvalidPidMappingException("KABOOM!"));
    }

    @Test
    public void shouldSwallowExceptionOnEosAbortTxInvalidEpoch() {
        testSwallowExceptionOnEosAbortTx(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testSwallowExceptionOnEosAbortTx(RuntimeException runtimeException) {
        Mockito.when(this.mockedProducer.send(this.record, (Callback) null)).thenReturn((Object) null);
        ((Producer) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.mockedProducer)).abortTransaction();
        this.eosAlphaStreamsProducerWithMock.initTransaction();
        this.eosAlphaStreamsProducerWithMock.send(this.record, (Callback) null);
        this.eosAlphaStreamsProducerWithMock.abortTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).initTransactions();
        ((Producer) Mockito.verify(this.mockedProducer)).beginTransaction();
    }

    @Test
    public void shouldThrowStreamsExceptionOnEosAbortTxError() {
        this.eosAlphaMockProducer.abortTransactionException = new KafkaException("KABOOM!");
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        StreamsProducer streamsProducer = this.eosAlphaStreamsProducer;
        Objects.requireNonNull(streamsProducer);
        StreamsException assertThrows = Assertions.assertThrows(StreamsException.class, streamsProducer::abortTransaction);
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.is(this.eosAlphaMockProducer.abortTransactionException));
        MatcherAssert.assertThat(assertThrows.getMessage(), Matchers.is("Error encounter trying to abort a transaction [test]"));
    }

    @Test
    public void shouldFailOnEosAbortTxFatal() {
        this.eosAlphaMockProducer.abortTransactionException = new RuntimeException("KABOOM!");
        this.eosAlphaStreamsProducer.send(this.record, (Callback) null);
        StreamsProducer streamsProducer = this.eosAlphaStreamsProducer;
        Objects.requireNonNull(streamsProducer);
        MatcherAssert.assertThat(((RuntimeException) Assertions.assertThrows(RuntimeException.class, streamsProducer::abortTransaction)).getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldCloseExistingProducerOnResetProducer() {
        this.eosBetaStreamsProducer.resetProducer();
        Assertions.assertTrue(this.eosBetaMockProducer.closed());
    }

    @Test
    public void shouldSetNewProducerOnResetProducer() {
        this.eosBetaStreamsProducer.resetProducer();
        MatcherAssert.assertThat(Integer.valueOf(this.eosBetaMockClientSupplier.producers.size()), Matchers.is(2));
        MatcherAssert.assertThat(this.eosBetaStreamsProducer.kafkaProducer(), Matchers.is(this.eosBetaMockClientSupplier.producers.get(1)));
    }

    @Test
    public void shouldResetTransactionInitializedOnResetProducer() {
        StreamsProducer streamsProducer = new StreamsProducer(this.eosBetaConfig, "threadId-StreamThread-0", this.clientSupplier, (TaskId) null, UUID.randomUUID(), this.logContext, this.mockTime);
        streamsProducer.initTransaction();
        Mockito.when(this.mockedProducer.metrics()).thenReturn(Collections.emptyMap());
        streamsProducer.resetProducer();
        streamsProducer.initTransaction();
        ((Producer) Mockito.verify(this.mockedProducer)).close();
        ((Producer) Mockito.verify(this.mockedProducer, Mockito.times(2))).initTransactions();
    }

    @Test
    public void shouldComputeTotalBlockedTime() {
        setProducerMetrics(this.nonEosMockProducer, BUFFER_POOL_WAIT_TIME, FLUSH_TME, TXN_INIT_TIME, TXN_BEGIN_TIME, TXN_SEND_OFFSETS_TIME, TXN_COMMIT_TIME, TXN_ABORT_TIME, METADATA_WAIT_TIME);
        MatcherAssert.assertThat(Double.valueOf(this.nonEosStreamsProducer.totalBlockedTime()), Matchers.closeTo(36.0d, 0.01d));
    }

    @Test
    public void shouldComputeTotalBlockedTimeAfterReset() {
        setProducerMetrics(this.eosBetaMockProducer, BUFFER_POOL_WAIT_TIME, FLUSH_TME, TXN_INIT_TIME, TXN_BEGIN_TIME, TXN_SEND_OFFSETS_TIME, TXN_COMMIT_TIME, TXN_ABORT_TIME, METADATA_WAIT_TIME);
        MatcherAssert.assertThat(Double.valueOf(this.eosBetaStreamsProducer.totalBlockedTime()), Matchers.equalTo(Double.valueOf(36.0d)));
        Mockito.when(Long.valueOf(this.mockTime.nanoseconds())).thenReturn(1L).thenReturn(2L);
        this.eosBetaStreamsProducer.resetProducer();
        setProducerMetrics(this.eosBetaMockClientSupplier.producers.get(1), BUFFER_POOL_WAIT_TIME, FLUSH_TME, TXN_INIT_TIME, TXN_BEGIN_TIME, TXN_SEND_OFFSETS_TIME, TXN_COMMIT_TIME, TXN_ABORT_TIME, METADATA_WAIT_TIME);
        MatcherAssert.assertThat(Double.valueOf(this.eosBetaStreamsProducer.totalBlockedTime()), Matchers.closeTo(73.0d, 0.01d));
    }

    private MetricName metricName(String str) {
        return new MetricName(str, "", "", Collections.emptyMap());
    }

    private void addMetric(MockProducer<?, ?> mockProducer, String str, final double d) {
        final MetricName metricName = metricName(str);
        mockProducer.setMockMetrics(metricName, new Metric() { // from class: org.apache.kafka.streams.processor.internals.StreamsProducerTest.5
            public MetricName metricName() {
                return metricName;
            }

            public Object metricValue() {
                return Double.valueOf(d);
            }
        });
    }

    private void setProducerMetrics(MockProducer<?, ?> mockProducer, double d, double d2, double d3, double d4, double d5, double d6, double d7, double d8) {
        addMetric(mockProducer, "bufferpool-wait-time-ns-total", d);
        addMetric(mockProducer, "flush-time-ns-total", d2);
        addMetric(mockProducer, "txn-init-time-ns-total", d3);
        addMetric(mockProducer, "txn-begin-time-ns-total", d4);
        addMetric(mockProducer, "txn-send-offsets-time-ns-total", d5);
        addMetric(mockProducer, "txn-commit-time-ns-total", d6);
        addMetric(mockProducer, "txn-abort-time-ns-total", d7);
        addMetric(mockProducer, "metadata-wait-time-ns-total", d8);
    }
}
