package org.apache.flink.streaming.runtime.io.benchmark;

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskmanager.InputGateWithMetrics;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.ExceptionUtils;

/* loaded from: input_file:org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.class */
public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
    private static final InetAddress LOCAL_ADDRESS;
    protected NettyShuffleEnvironment senderEnv;
    protected NettyShuffleEnvironment receiverEnv;
    protected int channels;
    protected ResultPartitionID[] partitionIds;
    private int dataPort;
    private SingleInputGateFactory gateFactory;
    private final ResourceID location = ResourceID.generate();
    protected final JobID jobId = new JobID();
    protected final IntermediateDataSetID dataSetID = new IntermediateDataSetID();
    protected boolean localMode = false;

    public void setUp(int i, int i2, boolean z, int i3, int i4) throws Exception {
        setUp(i, i2, z, i3, i4, new Configuration());
    }

    public void setUp(int i, int i2, boolean z, int i3, int i4, Configuration configuration) throws Exception {
        this.localMode = z;
        this.channels = i2;
        this.partitionIds = new ResultPartitionID[i];
        if (i3 == -1) {
            i3 = Math.max(2048, i * i2 * 4);
        }
        if (i4 == -1) {
            i4 = Math.max(2048, i * i2 * 4);
        }
        this.senderEnv = createShuffleEnvironment(i3, configuration);
        this.dataPort = this.senderEnv.start();
        if (z && i3 == i4) {
            this.receiverEnv = this.senderEnv;
        } else {
            this.receiverEnv = createShuffleEnvironment(i4, configuration);
            this.receiverEnv.start();
        }
        this.gateFactory = new SingleInputGateBenchmarkFactory(this.location, this.receiverEnv.getConfiguration(), this.receiverEnv.getConnectionManager(), this.receiverEnv.getResultPartitionManager(), new TaskEventDispatcher(), this.receiverEnv.getNetworkBufferPool());
        generatePartitionIds();
    }

    public void tearDown() {
        NettyShuffleEnvironment nettyShuffleEnvironment = this.senderEnv;
        nettyShuffleEnvironment.getClass();
        ExceptionUtils.suppressExceptions(nettyShuffleEnvironment::close);
        NettyShuffleEnvironment nettyShuffleEnvironment2 = this.receiverEnv;
        nettyShuffleEnvironment2.getClass();
        ExceptionUtils.suppressExceptions(nettyShuffleEnvironment2::close);
    }

    public SerializingLongReceiver createReceiver() throws Exception {
        SerializingLongReceiver serializingLongReceiver = new SerializingLongReceiver(createInputGate(new TaskManagerLocation(ResourceID.generate(), LOCAL_ADDRESS, this.dataPort)), this.channels * this.partitionIds.length);
        serializingLongReceiver.start();
        return serializingLongReceiver;
    }

    public ResultPartitionWriter createResultPartitionWriter(int i) throws Exception {
        ResultPartition build = new ResultPartitionBuilder().setResultPartitionId(this.partitionIds[i]).setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED).setNumberOfSubpartitions(this.channels).setResultPartitionManager(this.senderEnv.getResultPartitionManager()).setupBufferPoolFactoryFromNettyShuffleEnvironment(this.senderEnv).build();
        build.setup();
        return build;
    }

    private void generatePartitionIds() throws Exception {
        for (int i = 0; i < this.partitionIds.length; i++) {
            this.partitionIds[i] = new ResultPartitionID();
        }
    }

    private NettyShuffleEnvironment createShuffleEnvironment(int i, Configuration configuration) throws Exception {
        return new NettyShuffleEnvironmentBuilder().setNumNetworkBuffers(i).setNettyConfig(new NettyConfig(LOCAL_ADDRESS, 0, ConfigurationParserUtils.getPageSize(configuration), ConfigurationParserUtils.getSlot(configuration), configuration)).build();
    }

    private InputGate createInputGate(TaskManagerLocation taskManagerLocation) throws Exception {
        InputGate[] inputGateArr = new IndexedInputGate[this.partitionIds.length];
        for (int i = 0; i < inputGateArr.length; i++) {
            IndexedInputGate createInputGateWithMetrics = createInputGateWithMetrics(this.gateFactory, createInputGateDeploymentDescriptor(taskManagerLocation, i, this.location), i);
            createInputGateWithMetrics.setup();
            inputGateArr[i] = createInputGateWithMetrics;
        }
        return inputGateArr.length > 1 ? new UnionInputGate(inputGateArr) : inputGateArr[0];
    }

    private InputGateDeploymentDescriptor createInputGateDeploymentDescriptor(TaskManagerLocation taskManagerLocation, int i, ResourceID resourceID) throws IOException {
        ShuffleDescriptor[] shuffleDescriptorArr = new ShuffleDescriptor[this.channels];
        for (int i2 = 0; i2 < this.channels; i2++) {
            shuffleDescriptorArr[i2] = createShuffleDescriptor(this.localMode, this.partitionIds[i], resourceID, taskManagerLocation, i2);
        }
        return new InputGateDeploymentDescriptor(this.dataSetID, ResultPartitionType.PIPELINED_BOUNDED, 0, shuffleDescriptorArr);
    }

    private IndexedInputGate createInputGateWithMetrics(SingleInputGateFactory singleInputGateFactory, InputGateDeploymentDescriptor inputGateDeploymentDescriptor, int i) {
        TaskMetricGroup createUnregisteredTaskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
        return new InputGateWithMetrics(singleInputGateFactory.create(this.receiverEnv.createShuffleIOOwnerContext("receiving task[" + i + "]", createUnregisteredTaskMetricGroup.executionId(), createUnregisteredTaskMetricGroup), i, inputGateDeploymentDescriptor, SingleInputGateBuilder.NO_OP_PRODUCER_CHECKER, InputChannelTestUtils.newUnregisteredInputChannelMetrics()), new SimpleCounter());
    }

    private static ShuffleDescriptor createShuffleDescriptor(boolean z, ResultPartitionID resultPartitionID, ResourceID resourceID, TaskManagerLocation taskManagerLocation, int i) {
        NettyShuffleDescriptorBuilder connectionIndex = NettyShuffleDescriptorBuilder.newBuilder().setId(resultPartitionID).setProducerInfoFromTaskManagerLocation(taskManagerLocation).setConnectionIndex(i);
        return z ? connectionIndex.setProducerLocation(resourceID).buildLocal() : connectionIndex.buildRemote();
    }

    static {
        try {
            LOCAL_ADDRESS = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            throw new Error(e);
        }
    }
}
