package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput.class */
public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) VertexManagerWithConcurrentInput.class);
    private final Map<String, Boolean> srcVerticesConfigured;
    private int managedTasks;
    private AtomicBoolean tasksScheduled;
    private AtomicBoolean onVertexStartedDone;
    private Configuration vertexConfig;
    private String vertexName;
    private EdgeProperty.ConcurrentEdgeTriggerType edgeTriggerType;
    private volatile boolean allSrcVerticesConfigured;
    int completedUpstreamTasks;

    /* loaded from: input_file:org/apache/tez/dag/library/vertexmanager/VertexManagerWithConcurrentInput$ConcurrentInputVertexManagerConfigBuilder.class */
    public static final class ConcurrentInputVertexManagerConfigBuilder {
        private final Configuration conf;

        private ConcurrentInputVertexManagerConfigBuilder(@Nullable Configuration configuration) {
            if (configuration == null) {
                this.conf = new Configuration(false);
            } else {
                this.conf = configuration;
            }
        }

        public VertexManagerPluginDescriptor build() {
            try {
                return VertexManagerPluginDescriptor.create(VertexManagerWithConcurrentInput.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));
            } catch (IOException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    public VertexManagerWithConcurrentInput(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
        this.srcVerticesConfigured = Maps.newConcurrentMap();
        this.tasksScheduled = new AtomicBoolean(false);
        this.onVertexStartedDone = new AtomicBoolean(false);
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public void initialize() {
        UserPayload userPayload = getContext().getUserPayload();
        if (userPayload == null || userPayload.getPayload() == null || userPayload.getPayload().limit() == 0) {
            throw new TezUncheckedException("Could not initialize VertexManagerWithConcurrentInput from provided user payload");
        }
        this.managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
        for (Map.Entry<String, EdgeProperty> entry : getContext().getInputVertexEdgeProperties().entrySet()) {
            if (!EdgeProperty.SchedulingType.CONCURRENT.equals(entry.getValue().getSchedulingType())) {
                throw new TezUncheckedException("All input edges to vertex " + this.vertexName + "  must be CONCURRENT.");
            }
            String key = entry.getKey();
            this.srcVerticesConfigured.put(key, false);
            getContext().registerForVertexStateUpdates(key, EnumSet.of(VertexState.CONFIGURED));
        }
        try {
            this.vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
            this.edgeTriggerType = EdgeProperty.ConcurrentEdgeTriggerType.valueOf(this.vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE, TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
            if (!EdgeProperty.ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(this.edgeTriggerType)) {
                throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
            }
            LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", this.edgeTriggerType);
            this.vertexName = getContext().getVertexName();
            this.completedUpstreamTasks = 0;
        } catch (IOException e) {
            throw new TezUncheckedException(e);
        }
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public synchronized void onVertexStarted(List<TaskAttemptIdentifier> list) {
        this.onVertexStartedDone.set(true);
        scheduleTasks();
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public synchronized void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        VertexState vertexState = vertexStateUpdate.getVertexState();
        String vertexName = vertexStateUpdate.getVertexName();
        if (!this.srcVerticesConfigured.containsKey(vertexName)) {
            throw new IllegalArgumentException("Not expecting state update from vertex:" + vertexName + " in vertex: " + this.vertexName);
        }
        if (!VertexState.CONFIGURED.equals(vertexState)) {
            throw new IllegalArgumentException("Received incorrect state notification : " + vertexState + " from vertex: " + vertexName + " in vertex: " + this.vertexName);
        }
        LOG.info("Received configured notification: " + vertexState + " for vertex: " + vertexName + " in vertex: " + this.vertexName);
        this.srcVerticesConfigured.put(vertexName, true);
        boolean z = true;
        Iterator<Map.Entry<String, Boolean>> it = this.srcVerticesConfigured.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<String, Boolean> next = it.next();
            if (!next.getValue().booleanValue()) {
                LOG.info("Waiting for vertex {} in vertex {} ", next.getKey(), this.vertexName);
                z = false;
                break;
            }
        }
        this.allSrcVerticesConfigured = z;
        scheduleTasks();
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
        this.completedUpstreamTasks++;
        LOG.info("Source task attempt {} completion received at vertex {}", taskAttemptIdentifier, this.vertexName);
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
    }

    @Override // org.apache.tez.dag.api.VertexManagerPlugin
    public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
    }

    private void scheduleTasks() {
        if (this.onVertexStartedDone.get() && !this.tasksScheduled.get() && canScheduleTasks()) {
            this.tasksScheduled.compareAndSet(false, true);
            ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(this.managedTasks);
            for (int i = 0; i < this.managedTasks; i++) {
                newArrayListWithCapacity.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
            }
            if (newArrayListWithCapacity.isEmpty()) {
                return;
            }
            LOG.info("Starting {} tasks in {}.", Integer.valueOf(newArrayListWithCapacity.size()), this.vertexName);
            getContext().scheduleTasks(newArrayListWithCapacity);
        }
    }

    private boolean canScheduleTasks() {
        if (this.edgeTriggerType.equals(EdgeProperty.ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED)) {
            return this.allSrcVerticesConfigured;
        }
        throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
    }

    public static ConcurrentInputVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration configuration) {
        return new ConcurrentInputVertexManagerConfigBuilder(configuration);
    }
}
