package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/Tasks.class */
public class Tasks implements TasksRegistry {
    private final Logger log;
    private final Map<TaskId, Task> activeTasksPerId = new TreeMap();
    private final Map<TaskId, Task> standbyTasksPerId = new TreeMap();
    private final Map<TaskId, Set<TopicPartition>> pendingActiveTasksToCreate = new HashMap();
    private final Map<TaskId, Set<TopicPartition>> pendingStandbyTasksToCreate = new HashMap();
    private final Set<Task> pendingTasksToInit = new HashSet();
    private final Set<TaskId> failedTaskIds = new HashSet();
    private final Map<TopicPartition, Task> activeTasksPerPartition = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Tasks(LogContext logContext) {
        this.log = logContext.logger(getClass());
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void clearPendingTasksToCreate() {
        this.pendingActiveTasksToCreate.clear();
        this.pendingStandbyTasksToCreate.clear();
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Map<TaskId, Set<TopicPartition>> drainPendingActiveTasksForTopologies(Set<String> set) {
        Map<TaskId, Set<TopicPartition>> filterMap = Utils.filterMap(this.pendingActiveTasksToCreate, entry -> {
            return set.contains(((TaskId) entry.getKey()).topologyName());
        });
        this.pendingActiveTasksToCreate.keySet().removeAll(filterMap.keySet());
        return filterMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Map<TaskId, Set<TopicPartition>> drainPendingStandbyTasksForTopologies(Set<String> set) {
        Map<TaskId, Set<TopicPartition>> filterMap = Utils.filterMap(this.pendingStandbyTasksToCreate, entry -> {
            return set.contains(((TaskId) entry.getKey()).topologyName());
        });
        this.pendingStandbyTasksToCreate.keySet().removeAll(filterMap.keySet());
        return filterMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addPendingActiveTasksToCreate(Map<TaskId, Set<TopicPartition>> map) {
        this.pendingActiveTasksToCreate.putAll(map);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addPendingStandbyTasksToCreate(Map<TaskId, Set<TopicPartition>> map) {
        this.pendingStandbyTasksToCreate.putAll(map);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Set<Task> drainPendingTasksToInit() {
        HashSet hashSet = new HashSet(this.pendingTasksToInit);
        this.pendingTasksToInit.clear();
        return hashSet;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Set<Task> drainPendingActiveTasksToInit() {
        HashSet hashSet = new HashSet();
        Iterator<Task> it = this.pendingTasksToInit.iterator();
        while (it.hasNext()) {
            Task next = it.next();
            if (next.isActive()) {
                hashSet.add(next);
                it.remove();
            }
        }
        return hashSet;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Set<Task> pendingTasksToInit() {
        return Collections.unmodifiableSet(this.pendingTasksToInit);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addPendingTasksToInit(Collection<Task> collection) {
        this.pendingTasksToInit.addAll(collection);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public boolean hasPendingTasksToInit() {
        return !this.pendingTasksToInit.isEmpty();
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addActiveTasks(Collection<Task> collection) {
        if (collection.isEmpty()) {
            return;
        }
        Iterator<Task> it = collection.iterator();
        while (it.hasNext()) {
            addTask(it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addStandbyTasks(Collection<Task> collection) {
        if (collection.isEmpty()) {
            return;
        }
        Iterator<Task> it = collection.iterator();
        while (it.hasNext()) {
            addTask(it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized void addTask(Task task) {
        TaskId id = task.id();
        if (this.activeTasksPerId.containsKey(id)) {
            throw new IllegalStateException("Attempted to create an active task that we already own: " + id);
        }
        if (this.standbyTasksPerId.containsKey(id)) {
            throw new IllegalStateException("Attempted to create an active task while we already own its standby: " + id);
        }
        if (!task.isActive()) {
            this.standbyTasksPerId.put(task.id(), task);
            return;
        }
        this.activeTasksPerId.put(task.id(), task);
        this.pendingActiveTasksToCreate.remove(task.id());
        Iterator<TopicPartition> it = task.inputPartitions().iterator();
        while (it.hasNext()) {
            this.activeTasksPerPartition.put(it.next(), task);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public void addFailedTask(Task task) {
        this.failedTaskIds.add(task.id());
        addTask(task);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized void removeTask(Task task) {
        TaskId id = task.id();
        if (task.state() != Task.State.CLOSED && task.state() != Task.State.SUSPENDED) {
            throw new IllegalStateException("Attempted to remove a task that is not closed or suspended: " + id);
        }
        if (task.isActive()) {
            if (this.activeTasksPerId.remove(id) == null) {
                throw new IllegalArgumentException("Attempted to remove an active task that is not owned: " + id);
            }
            removePartitionsForActiveTask(id);
        } else if (this.standbyTasksPerId.remove(id) == null) {
            throw new IllegalArgumentException("Attempted to remove a standby task that is not owned: " + id);
        }
        this.failedTaskIds.remove(task.id());
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized void replaceActiveWithStandby(StandbyTask standbyTask) {
        TaskId id = standbyTask.id();
        if (this.activeTasksPerId.remove(id) == null) {
            throw new IllegalStateException("Attempted to replace unknown active task with standby task: " + id);
        }
        removePartitionsForActiveTask(id);
        this.standbyTasksPerId.put(standbyTask.id(), standbyTask);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized void replaceStandbyWithActive(StreamTask streamTask) {
        TaskId id = streamTask.id();
        if (this.standbyTasksPerId.remove(id) == null) {
            throw new IllegalStateException("Attempted to convert unknown standby task to stream task: " + id);
        }
        this.activeTasksPerId.put(streamTask.id(), streamTask);
        Iterator<TopicPartition> it = streamTask.inputPartitions().iterator();
        while (it.hasNext()) {
            this.activeTasksPerPartition.put(it.next(), streamTask);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public boolean updateActiveTaskInputPartitions(Task task, Set<TopicPartition> set) {
        boolean z = !task.inputPartitions().equals(set);
        if (z) {
            this.log.debug("Update task {} inputPartitions: current {}, new {}", new Object[]{task, task.inputPartitions(), set});
            if (task.isActive()) {
                Iterator<TopicPartition> it = task.inputPartitions().iterator();
                while (it.hasNext()) {
                    this.activeTasksPerPartition.remove(it.next());
                }
                Iterator<TopicPartition> it2 = set.iterator();
                while (it2.hasNext()) {
                    this.activeTasksPerPartition.put(it2.next(), task);
                }
            }
        }
        return z;
    }

    private void removePartitionsForActiveTask(TaskId taskId) {
        Set set = (Set) this.activeTasksPerPartition.entrySet().stream().filter(entry -> {
            return ((Task) entry.getValue()).id().equals(taskId);
        }).map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toSet());
        Map<TopicPartition, Task> map = this.activeTasksPerPartition;
        Objects.requireNonNull(map);
        set.forEach((v1) -> {
            r1.remove(v1);
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized void clear() {
        this.activeTasksPerId.clear();
        this.standbyTasksPerId.clear();
        this.activeTasksPerPartition.clear();
        this.failedTaskIds.clear();
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Task activeTasksForInputPartition(TopicPartition topicPartition) {
        return this.activeTasksPerPartition.get(topicPartition);
    }

    private synchronized Task getTask(TaskId taskId) {
        if (this.activeTasksPerId.containsKey(taskId)) {
            return this.activeTasksPerId.get(taskId);
        }
        if (this.standbyTasksPerId.containsKey(taskId)) {
            return this.standbyTasksPerId.get(taskId);
        }
        return null;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Task task(TaskId taskId) {
        Task task = getTask(taskId);
        if (task != null) {
            return task;
        }
        throw new IllegalStateException("Task unknown: " + taskId);
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public Collection<Task> tasks(Collection<TaskId> collection) {
        HashSet hashSet = new HashSet();
        Iterator<TaskId> it = collection.iterator();
        while (it.hasNext()) {
            hashSet.add(task(it.next()));
        }
        return hashSet;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Collection<TaskId> activeTaskIds() {
        return Collections.unmodifiableCollection(this.activeTasksPerId.keySet());
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Collection<Task> activeTasks() {
        return Collections.unmodifiableCollection(this.activeTasksPerId.values());
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Set<Task> allTasks() {
        return Utils.union(HashSet::new, new Set[]{new HashSet(this.activeTasksPerId.values()), new HashSet(this.standbyTasksPerId.values())});
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Set<Task> allNonFailedTasks() {
        return Utils.union(HashSet::new, new Set[]{(Set) this.activeTasksPerId.values().stream().filter(task -> {
            return !this.failedTaskIds.contains(task.id());
        }).collect(Collectors.toSet()), (Set) this.standbyTasksPerId.values().stream().filter(task2 -> {
            return !this.failedTaskIds.contains(task2.id());
        }).collect(Collectors.toSet())});
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Set<TaskId> allTaskIds() {
        return Utils.union(HashSet::new, new Set[]{this.activeTasksPerId.keySet(), this.standbyTasksPerId.keySet()});
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public synchronized Map<TaskId, Task> allTasksPerId() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.activeTasksPerId);
        hashMap.putAll(this.standbyTasksPerId);
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.TasksRegistry
    public boolean contains(TaskId taskId) {
        return getTask(taskId) != null;
    }
}
