/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.DestinationTaskInputsProperty;
import org.apache.tez.dag.library.vertexmanager.FairEdgeConfiguration;
import org.apache.tez.dag.library.vertexmanager.FairShuffleEdgeManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Public
@InterfaceStability.Evolving
public class FairShuffleVertexManager
extends ShuffleVertexManagerBase {
    private static final Logger LOG = LoggerFactory.getLogger(FairShuffleVertexManager.class);
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE = "tez.fair-shuffle-vertex-manager.desired-task-input-size";
    public static final long TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT = 100L * MB;
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL = "tez.fair-shuffle-vertex-manager.enable.auto-parallel";
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT = FairRoutingType.NONE.getType();
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.fair-shuffle-vertex-manager.min-src-fraction";
    public static final float TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.fair-shuffle-vertex-manager.max-src-fraction";
    public static final float TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    FairShuffleVertexManagerConfig mgrConfig;

    @Override
    ShuffleVertexManagerBase.SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty, int numTasks) {
        return new FairSourceVertexInfo(edgeProperty, numTasks);
    }

    public FairShuffleVertexManager(VertexManagerPluginContext context) {
        super(context);
    }

    @Override
    protected void onVertexStartedCheck() {
        super.onVertexStartedCheck();
        if (this.bipartiteSources > 1 && this.mgrConfig.getFairRoutingType().fairParallelismEnabled()) {
            throw new TezUncheckedException("Having more than one destination task process same partition(s) only works with one bipartite source.");
        }
    }

    static long ceil(long a, long b) {
        return (a + (b - 1L)) / b;
    }

    public long[] estimatePartitionSize() {
        long[] estimatedPartitionOutputSize;
        block4: {
            int i;
            int numOfPartitions;
            block3: {
                boolean partitionStatsReported = false;
                numOfPartitions = this.pendingTasks.size();
                estimatedPartitionOutputSize = new long[numOfPartitions];
                for (i = 0; i < numOfPartitions; ++i) {
                    if (this.getCurrentlyKnownStatsAtIndex(i) <= 0) continue;
                    partitionStatsReported = true;
                    break;
                }
                if (partitionStatsReported) break block3;
                if (numOfPartitions <= 0) break block4;
                long estimatedPerPartitionSize = this.getExpectedTotalBipartiteSourceTasksOutputSize().divide(BigInteger.valueOf(numOfPartitions)).longValue();
                for (int i2 = 0; i2 < numOfPartitions; ++i2) {
                    estimatedPartitionOutputSize[i2] = estimatedPerPartitionSize;
                }
                break block4;
            }
            for (i = 0; i < numOfPartitions; ++i) {
                estimatedPartitionOutputSize[i] = this.getExpectedStatsAtIndex(i);
                LOG.info("Partition index {} with size {}", (Object)i, (Object)estimatedPartitionOutputSize[i]);
            }
        }
        return estimatedPartitionOutputSize;
    }

    @Override
    public ShuffleVertexManagerBase.ReconfigVertexParams computeRouting() {
        int currentParallelism = this.pendingTasks.size();
        int finalTaskParallelism = 0;
        long[] estimatedPartitionOutputSize = this.estimatePartitionSize();
        for (Map.Entry<String, ShuffleVertexManagerBase.SourceVertexInfo> vInfo : this.getBipartiteInfo()) {
            FairSourceVertexInfo info = (FairSourceVertexInfo)vInfo.getValue();
            this.computeParallelism(estimatedPartitionOutputSize, info);
            if (finalTaskParallelism != 0) {
                Preconditions.checkState((finalTaskParallelism == info.getDestinationInputsProperties().size() ? 1 : 0) != 0, (Object)"the parallelism shall be the same for source vertices");
            }
            finalTaskParallelism = info.getDestinationInputsProperties().size();
            FairEdgeConfiguration fairEdgeConfig = new FairEdgeConfiguration(currentParallelism, info.getDestinationInputsProperties());
            EdgeManagerPluginDescriptor descriptor = EdgeManagerPluginDescriptor.create((String)FairShuffleEdgeManager.class.getName());
            descriptor.setUserPayload(fairEdgeConfig.getBytePayload());
            vInfo.getValue().newDescriptor = descriptor;
        }
        ShuffleVertexManagerBase.ReconfigVertexParams params = new ShuffleVertexManagerBase.ReconfigVertexParams(finalTaskParallelism, null);
        return params;
    }

    @Override
    void postReconfigVertex() {
    }

    @Override
    void processPendingTasks() {
    }

    private void computeParallelism(long[] estimatedPartitionOutputSize, FairSourceVertexInfo sourceVertexInfo) {
        PartitionsGroupingCalculator calculator = new PartitionsGroupingCalculator(estimatedPartitionOutputSize, sourceVertexInfo);
        calculator.compute();
    }

    @Override
    List<VertexManagerPluginContext.ScheduleTaskRequest> getTasksToSchedule(TaskAttemptIdentifier completedSourceAttempt) {
        float minSourceVertexCompletedTaskFraction = this.getMinSourceVertexCompletedTaskFraction();
        int numTasksToSchedule = this.getNumOfTasksToScheduleAndLog(minSourceVertexCompletedTaskFraction);
        if (numTasksToSchedule > 0) {
            boolean scheduleAll = numTasksToSchedule == this.pendingTasks.size();
            ArrayList tasksToSchedule = Lists.newArrayListWithCapacity((int)numTasksToSchedule);
            Iterator it = this.pendingTasks.iterator();
            FairSourceVertexInfo srcInfo = null;
            int srcTaskId = 0;
            if (completedSourceAttempt != null) {
                srcTaskId = completedSourceAttempt.getTaskIdentifier().getIdentifier();
                String srcVertexName = completedSourceAttempt.getTaskIdentifier().getVertexIdentifier().getName();
                srcInfo = (FairSourceVertexInfo)this.getSourceVertexInfo(srcVertexName);
            }
            while (it.hasNext() && numTasksToSchedule > 0) {
                DestinationTaskInputsProperty property;
                Integer taskIndex = ((ShuffleVertexManagerBase.PendingTaskInfo)it.next()).getIndex();
                if (!scheduleAll && this.config.isAutoParallelismEnabled() && srcInfo != null && srcInfo.getDestinationInputsProperties().size() > 0 && !(property = srcInfo.getDestinationInputsProperties().get(taskIndex)).isSourceTaskInRange(srcTaskId)) {
                    LOG.debug("completedSourceTaskIndex {} and taskIndex {} don't connect.", (Object)srcTaskId, (Object)taskIndex);
                    continue;
                }
                tasksToSchedule.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)taskIndex, null));
                it.remove();
                --numTasksToSchedule;
            }
            return tasksToSchedule;
        }
        return null;
    }

    @Override
    ShuffleVertexManagerBase.ShuffleVertexManagerBaseConfig initConfiguration() {
        float slowStartMinFraction = this.conf.getFloat(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
        FairRoutingType fairRoutingType = FairRoutingType.fromString(this.conf.get(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL_DEFAULT));
        this.mgrConfig = new FairShuffleVertexManagerConfig(fairRoutingType.enabled(), this.conf.getLong(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE_DEFAULT), slowStartMinFraction, this.conf.getFloat(TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, Math.max(slowStartMinFraction, 0.75f)), fairRoutingType);
        return this.mgrConfig;
    }

    public static FairShuffleVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration conf) {
        return new FairShuffleVertexManagerConfigBuilder(conf);
    }

    static class FairSourceVertexInfo
    extends ShuffleVertexManagerBase.SourceVertexInfo {
        private final HashMap<Integer, DestinationTaskInputsProperty> destinationInputsProperties = new HashMap();

        FairSourceVertexInfo(EdgeProperty edgeProperty, int totalTasksToSchedule) {
            super(edgeProperty, totalTasksToSchedule);
        }

        public HashMap<Integer, DestinationTaskInputsProperty> getDestinationInputsProperties() {
            return this.destinationInputsProperties;
        }
    }

    static class FairShuffleVertexManagerConfig
    extends ShuffleVertexManagerBase.ShuffleVertexManagerBaseConfig {
        final FairRoutingType fairRoutingType;

        public FairShuffleVertexManagerConfig(boolean enableAutoParallelism, long desiredTaskInputDataSize, float slowStartMinFraction, float slowStartMaxFraction, FairRoutingType fairRoutingType) {
            super(enableAutoParallelism, desiredTaskInputDataSize, slowStartMinFraction, slowStartMaxFraction);
            this.fairRoutingType = fairRoutingType;
            LOG.info("fairRoutingType {}", (Object)this.fairRoutingType);
        }

        FairRoutingType getFairRoutingType() {
            return this.fairRoutingType;
        }
    }

    public static enum FairRoutingType {
        NONE("none"),
        REDUCE_PARALLELISM("reduce_parallelism"),
        FAIR_PARALLELISM("fair_parallelism");

        private final String type;

        private FairRoutingType(String type) {
            this.type = type;
        }

        public final String getType() {
            return this.type;
        }

        public boolean reduceParallelismEnabled() {
            return this.equals((Object)REDUCE_PARALLELISM);
        }

        public boolean fairParallelismEnabled() {
            return this.equals((Object)FAIR_PARALLELISM);
        }

        public boolean enabled() {
            return !this.equals((Object)NONE);
        }

        public static FairRoutingType fromString(String type) {
            if (type != null) {
                for (FairRoutingType b : FairRoutingType.values()) {
                    if (!type.equalsIgnoreCase(b.type)) continue;
                    return b;
                }
            }
            throw new IllegalArgumentException("Invalid type " + type);
        }
    }

    private class PartitionsGroupingCalculator
    implements Iterable<DestinationTaskInputsProperty> {
        private final FairSourceVertexInfo sourceVertexInfo;
        private long[] estimatedPartitionOutputSize;
        private long sizeOfPartitions = 0L;
        private int numOfPartitions = 0;
        private int firstPartitionId = 0;
        private int numOfBaseSourceTasks = 0;
        private int numOfBaseDestinationTasks = 0;

        public PartitionsGroupingCalculator(long[] estimatedPartitionOutputSize, FairSourceVertexInfo sourceVertexInfo) {
            this.estimatedPartitionOutputSize = estimatedPartitionOutputSize;
            this.sourceVertexInfo = sourceVertexInfo;
        }

        private void startNextPartitionsGroup() {
            this.firstPartitionId += this.numOfPartitions;
            this.sizeOfPartitions = 0L;
            this.numOfPartitions = 0;
            this.numOfBaseSourceTasks = 0;
            this.numOfBaseDestinationTasks = 0;
        }

        private int getNextPartitionId() {
            return this.firstPartitionId + this.numOfPartitions;
        }

        private void addNextPartition() {
            if (this.hasPartitionsLeft()) {
                this.sizeOfPartitions += this.estimatedPartitionOutputSize[this.getNextPartitionId()];
                ++this.numOfPartitions;
            }
        }

        private boolean hasPartitionsLeft() {
            return this.getNextPartitionId() < this.estimatedPartitionOutputSize.length;
        }

        private long getCurrentAndNextPartitionSize() {
            return this.hasPartitionsLeft() ? this.sizeOfPartitions + this.estimatedPartitionOutputSize[this.getNextPartitionId()] : this.sizeOfPartitions;
        }

        private boolean computeSourceTasksGrouping() {
            boolean finalizeCurrentPartitions = true;
            int groupCount = Ints.checkedCast((long)FairShuffleVertexManager.ceil(this.getCurrentAndNextPartitionSize(), FairShuffleVertexManager.this.config.getDesiredTaskInputDataSize()));
            if (groupCount <= 1) {
                this.addNextPartition();
                if (!this.hasPartitionsLeft()) {
                    this.numOfBaseDestinationTasks = 1;
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
                } else {
                    finalizeCurrentPartitions = false;
                }
            } else if (this.numOfPartitions == 0) {
                this.addNextPartition();
                if (FairShuffleVertexManager.this.mgrConfig.getFairRoutingType().reduceParallelismEnabled()) {
                    this.numOfBaseDestinationTasks = 1;
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
                } else if (this.sourceVertexInfo.numTasks >= groupCount) {
                    this.numOfBaseDestinationTasks = groupCount - this.sourceVertexInfo.numTasks % groupCount;
                    this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks / groupCount;
                } else {
                    this.numOfBaseDestinationTasks = this.sourceVertexInfo.numTasks;
                    this.numOfBaseSourceTasks = 1;
                }
            } else {
                this.numOfBaseDestinationTasks = 1;
                this.numOfBaseSourceTasks = this.sourceVertexInfo.numTasks;
            }
            return finalizeCurrentPartitions;
        }

        @Override
        public Iterator<DestinationTaskInputsProperty> iterator() {
            return new UnmodifiableIterator<DestinationTaskInputsProperty>(){
                private int j = 0;
                private boolean visitedAtLeastOnce = false;
                private int groupIndex = 0;

                private int getNumOfSourceTasks() {
                    return this.groupIndex++ < PartitionsGroupingCalculator.this.numOfBaseDestinationTasks ? PartitionsGroupingCalculator.this.numOfBaseSourceTasks : PartitionsGroupingCalculator.this.numOfBaseSourceTasks + 1;
                }

                public boolean hasNext() {
                    return this.j < ((PartitionsGroupingCalculator)PartitionsGroupingCalculator.this).sourceVertexInfo.numTasks || !this.visitedAtLeastOnce;
                }

                public DestinationTaskInputsProperty next() {
                    if (this.hasNext()) {
                        this.visitedAtLeastOnce = true;
                        int start = this.j;
                        int numOfSourceTasks = this.getNumOfSourceTasks();
                        this.j += numOfSourceTasks;
                        return new DestinationTaskInputsProperty(PartitionsGroupingCalculator.this.firstPartitionId, PartitionsGroupingCalculator.this.numOfPartitions, start, numOfSourceTasks);
                    }
                    throw new NoSuchElementException();
                }
            };
        }

        public void compute() {
            int destinationIndex = 0;
            while (this.hasPartitionsLeft()) {
                if (!this.computeSourceTasksGrouping()) continue;
                for (DestinationTaskInputsProperty property : this) {
                    this.sourceVertexInfo.getDestinationInputsProperties().put(destinationIndex, property);
                    LOG.info("Destination Index {}: Input Property {}", (Object)(++destinationIndex), (Object)property);
                }
                this.startNextPartitionsGroup();
            }
        }
    }

    public static final class FairShuffleVertexManagerConfigBuilder {
        private final Configuration conf;

        private FairShuffleVertexManagerConfigBuilder(@Nullable Configuration conf) {
            this.conf = conf == null ? new Configuration(false) : conf;
        }

        public FairShuffleVertexManagerConfigBuilder setAutoParallelism(FairRoutingType fairRoutingType) {
            this.conf.set(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, fairRoutingType.toString());
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setSlowStartMinSrcCompletionFraction(float minFraction) {
            this.conf.setFloat(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, minFraction);
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setSlowStartMaxSrcCompletionFraction(float maxFraction) {
            this.conf.setFloat(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, maxFraction);
            return this;
        }

        public FairShuffleVertexManagerConfigBuilder setDesiredTaskInputSize(long desiredTaskInputSize) {
            this.conf.setLong(FairShuffleVertexManager.TEZ_FAIR_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, desiredTaskInputSize);
            return this;
        }

        public VertexManagerPluginDescriptor build() {
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)FairShuffleVertexManager.class.getName());
            try {
                return (VertexManagerPluginDescriptor)desc.setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)this.conf));
            }
            catch (IOException e) {
                throw new TezUncheckedException((Throwable)e);
            }
        }
    }
}

