package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.Collections;
import java.util.Objects;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.class */
public class DefaultTaskExecutorTest {
    private static final long VERIFICATION_TIMEOUT = 15000;
    private final Time time = new MockTime(1);
    private final StreamTask task = (StreamTask) Mockito.mock(StreamTask.class);
    private final TaskManager taskManager = (TaskManager) Mockito.mock(TaskManager.class);
    private final TaskExecutionMetadata taskExecutionMetadata = (TaskExecutionMetadata) Mockito.mock(TaskExecutionMetadata.class);
    private final DefaultTaskExecutor taskExecutor = new DefaultTaskExecutor(this.taskManager, "TaskExecutor", this.time, this.taskExecutionMetadata);

    @BeforeEach
    public void setUp() {
        Mockito.when(this.taskManager.assignNextTask(this.taskExecutor)).thenReturn(this.task).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.task.id()).thenReturn(new TaskId(0, 0, "A"));
        Mockito.when(Boolean.valueOf(this.task.process(ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(this.task.prepareCommit()).thenReturn(Collections.emptyMap());
    }

    @AfterEach
    public void tearDown() {
        this.taskExecutor.requestShutdown();
        this.taskExecutor.awaitShutdown(Duration.ofMinutes(1L));
    }

    @Test
    public void shouldShutdownTaskExecutor() {
        Assertions.assertNull(this.taskExecutor.currentTask(), "Have task assigned before startup");
        Assertions.assertFalse(this.taskExecutor.isRunning());
        this.taskExecutor.start();
        Assertions.assertTrue(this.taskExecutor.isRunning());
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).assignNextTask(this.taskExecutor);
        this.taskExecutor.requestShutdown();
        this.taskExecutor.awaitShutdown(Duration.ofMinutes(1L));
        ((StreamTask) Mockito.verify(this.task)).flush();
        ((TaskManager) Mockito.verify(this.taskManager)).unassignTask(this.task, this.taskExecutor);
        Assertions.assertNull(this.taskExecutor.currentTask(), "Have task assigned after shutdown");
        Assertions.assertFalse(this.taskExecutor.isRunning());
    }

    @Test
    public void shouldClearTaskReleaseFutureOnShutdown() throws InterruptedException {
        Assertions.assertNull(this.taskExecutor.currentTask(), "Have task assigned before startup");
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).assignNextTask(this.taskExecutor);
        KafkaFuture unassign = this.taskExecutor.unassign();
        this.taskExecutor.requestShutdown();
        this.taskExecutor.awaitShutdown(Duration.ofMinutes(1L));
        Objects.requireNonNull(unassign);
        TestUtils.waitForCondition(unassign::isDone, "Await for unassign future to complete");
        Assertions.assertNull(this.taskExecutor.currentTask(), "Have task assigned after shutdown");
    }

    @Test
    public void shouldAwaitProcessableTasksIfNoneAssignable() throws InterruptedException {
        Assertions.assertNull(this.taskExecutor.currentTask(), "Have task assigned before startup");
        Mockito.when(this.taskManager.assignNextTask(this.taskExecutor)).thenReturn((Object) null);
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).awaitProcessableTasks();
    }

    @Test
    public void shouldUnassignTaskWhenNotProgressing() {
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.task.maybePunctuateStreamTime())).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.task.maybePunctuateSystemTime())).thenReturn(false);
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        ((StreamTask) Mockito.verify(this.task)).flush();
        Assertions.assertNull(this.taskExecutor.currentTask());
    }

    @Test
    public void shouldProcessTasks() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.any(), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(2))).process(ArgumentMatchers.anyLong());
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).recordProcessBatchTime(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldClearTaskTimeoutOnProcessed() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.any(), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.process(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).clearTaskTimeout();
    }

    @Test
    public void shouldSetTaskTimeoutOnTimeoutException() {
        Throwable timeoutException = new TimeoutException();
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.any(), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.process(ArgumentMatchers.anyLong()))).thenReturn(true).thenThrow(new Throwable[]{timeoutException});
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).process(ArgumentMatchers.anyLong());
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeastOnce())).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception) ArgumentMatchers.eq(timeoutException));
    }

    @Test
    public void shouldPunctuateStreamTime() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask(this.task))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.maybePunctuateStreamTime())).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(2))).maybePunctuateStreamTime();
    }

    @Test
    public void shouldPunctuateSystemTime() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask(this.task))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.maybePunctuateSystemTime())).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(2))).maybePunctuateSystemTime();
    }

    @Test
    public void shouldRespectPunctuationDisabledByTaskExecutionMetadata() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask(this.task))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT).atLeast(2))).process(ArgumentMatchers.anyLong());
        this.taskExecutor.unassign();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        ((StreamTask) Mockito.verify(this.task, Mockito.never())).maybePunctuateStreamTime();
        ((StreamTask) Mockito.verify(this.task, Mockito.never())).maybePunctuateSystemTime();
    }

    @Test
    public void shouldRespectProcessingDisabledByTaskExecutionMetadata() {
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canProcessTask((Task) ArgumentMatchers.eq(this.task), ArgumentMatchers.anyLong()))).thenReturn(false);
        Mockito.when(Boolean.valueOf(this.taskExecutionMetadata.canPunctuateTask(this.task))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.task.isProcessable(ArgumentMatchers.anyLong()))).thenReturn(true);
        this.taskExecutor.start();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT))).maybePunctuateSystemTime();
        ((StreamTask) Mockito.verify(this.task, Mockito.timeout(VERIFICATION_TIMEOUT))).maybePunctuateStreamTime();
        this.taskExecutor.unassign();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        ((StreamTask) Mockito.verify(this.task, Mockito.never())).process(ArgumentMatchers.anyLong());
    }

    @Test
    public void shouldUnassignTaskWhenRequired() throws Exception {
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).assignNextTask(this.taskExecutor);
        Assertions.assertNotNull(this.taskExecutor.currentTask());
        KafkaFuture unassign = this.taskExecutor.unassign();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        ((StreamTask) Mockito.verify(this.task)).flush();
        Assertions.assertNull(this.taskExecutor.currentTask());
        Assertions.assertTrue(unassign.isDone(), "Unassign is not completed");
        Assertions.assertEquals(this.task, unassign.get(), "Unexpected task was unassigned");
    }

    @Test
    public void shouldSetUncaughtStreamsException() {
        Throwable th = (StreamsException) Mockito.mock(StreamsException.class);
        Mockito.when(Boolean.valueOf(this.task.process(ArgumentMatchers.anyLong()))).thenThrow(new Throwable[]{th});
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).setUncaughtException(th, this.task.id());
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        Assertions.assertNull(this.taskExecutor.currentTask());
        Assertions.assertTrue(this.taskExecutor.isRunning(), "should not shut down upon exception");
    }

    @Test
    public void shouldNotFlushOnException() {
        Mockito.when(Boolean.valueOf(this.task.process(ArgumentMatchers.anyLong()))).thenThrow(new Throwable[]{(StreamsException) Mockito.mock(StreamsException.class)});
        Mockito.when(Boolean.valueOf(this.taskManager.hasUncaughtException(this.task.id()))).thenReturn(true);
        this.taskExecutor.start();
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.timeout(VERIFICATION_TIMEOUT))).unassignTask(this.task, this.taskExecutor);
        ((StreamTask) Mockito.verify(this.task, Mockito.never())).flush();
    }
}
