/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.DefaultStreamGraphContext;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.util.StreamEdgeUpdateRequestInfo;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.runtime.partitioner.ForwardForUnspecifiedPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

class DefaultStreamGraphContextTest {
    DefaultStreamGraphContextTest() {
    }

    @Test
    void testModifyStreamEdge() {
        StreamGraph streamGraph = this.createStreamGraphForModifyStreamEdgeTest();
        HashMap<Integer, StreamNodeForwardGroup> forwardGroupsByEndpointNodeIdCache = new HashMap<Integer, StreamNodeForwardGroup>();
        HashMap<Integer, Integer> frozenNodeToStartNodeMap = new HashMap<Integer, Integer>();
        HashMap<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputsCaches = new HashMap<Integer, Map<StreamEdge, NonChainedOutput>>();
        DefaultStreamGraphContext streamGraphContext = new DefaultStreamGraphContext(streamGraph, forwardGroupsByEndpointNodeIdCache, frozenNodeToStartNodeMap, opIntermediateOutputsCaches, new HashMap(), new HashSet(), Thread.currentThread().getContextClassLoader());
        StreamNode sourceNode = streamGraph.getStreamNode((Integer)streamGraph.getSourceIDs().iterator().next());
        StreamNode targetNode = streamGraph.getStreamNode(Integer.valueOf(((StreamEdge)sourceNode.getOutEdges().get(0)).getTargetId()));
        targetNode.setParallelism(Integer.valueOf(1));
        StreamEdge targetEdge = (StreamEdge)sourceNode.getOutEdges().get(0);
        StreamNodeForwardGroup forwardGroup1 = new StreamNodeForwardGroup(Collections.singleton(sourceNode));
        StreamNodeForwardGroup forwardGroup2 = new StreamNodeForwardGroup(Collections.singleton(targetNode));
        forwardGroupsByEndpointNodeIdCache.put(sourceNode.getId(), forwardGroup1);
        forwardGroupsByEndpointNodeIdCache.put(targetNode.getId(), forwardGroup2);
        StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo = new StreamEdgeUpdateRequestInfo(targetEdge.getEdgeId(), Integer.valueOf(targetEdge.getSourceId()), Integer.valueOf(targetEdge.getTargetId())).withOutputPartitioner((StreamPartitioner)new ForwardForUnspecifiedPartitioner());
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)(targetEdge.getPartitioner() instanceof ForwardPartitioner)).isTrue();
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isEqualTo(false);
        frozenNodeToStartNodeMap.put(targetEdge.getTargetId(), targetEdge.getTargetId());
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isEqualTo(false);
        NonChainedOutput nonChainedOutput = new NonChainedOutput(targetEdge.supportsUnalignedCheckpoints(), targetEdge.getSourceId(), targetNode.getParallelism(), targetNode.getMaxParallelism(), targetEdge.getBufferTimeout(), false, new IntermediateDataSetID(), targetEdge.getOutputTag(), targetEdge.getPartitioner(), ResultPartitionType.BLOCKING);
        opIntermediateOutputsCaches.put(targetEdge.getSourceId(), Map.of(targetEdge, nonChainedOutput, (StreamEdge)targetNode.getOutEdges().get(0), nonChainedOutput));
        frozenNodeToStartNodeMap.put(targetEdge.getTargetId(), targetEdge.getTargetId());
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isEqualTo(false);
    }

    @Test
    void testModifyToForwardPartitionerButResultIsRescale() {
        StreamGraph streamGraph = this.createStreamGraphForModifyStreamEdgeTest();
        HashMap<Integer, StreamNodeForwardGroup> forwardGroupsByEndpointNodeIdCache = new HashMap<Integer, StreamNodeForwardGroup>();
        HashMap<Integer, Integer> frozenNodeToStartNodeMap = new HashMap<Integer, Integer>();
        HashMap opIntermediateOutputsCaches = new HashMap();
        DefaultStreamGraphContext streamGraphContext = new DefaultStreamGraphContext(streamGraph, forwardGroupsByEndpointNodeIdCache, frozenNodeToStartNodeMap, opIntermediateOutputsCaches, new HashMap(), new HashSet(), Thread.currentThread().getContextClassLoader());
        StreamNode sourceNode = streamGraph.getStreamNode((Integer)streamGraph.getSourceIDs().iterator().next());
        StreamNode targetNode = streamGraph.getStreamNode(Integer.valueOf(((StreamEdge)sourceNode.getOutEdges().get(0)).getTargetId()));
        StreamEdge targetEdge = (StreamEdge)sourceNode.getOutEdges().get(0);
        StreamNodeForwardGroup forwardGroup1 = new StreamNodeForwardGroup(Collections.singleton(sourceNode));
        StreamNodeForwardGroup forwardGroup2 = new StreamNodeForwardGroup(Collections.singleton(targetNode));
        forwardGroupsByEndpointNodeIdCache.put(sourceNode.getId(), forwardGroup1);
        forwardGroupsByEndpointNodeIdCache.put(targetNode.getId(), forwardGroup2);
        StreamEdgeUpdateRequestInfo streamEdgeUpdateRequestInfo = new StreamEdgeUpdateRequestInfo(targetEdge.getEdgeId(), Integer.valueOf(targetEdge.getSourceId()), Integer.valueOf(targetEdge.getTargetId())).withOutputPartitioner((StreamPartitioner)new ForwardForUnspecifiedPartitioner());
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)(targetEdge.getPartitioner() instanceof RescalePartitioner)).isTrue();
        targetNode.setParallelism(Integer.valueOf(1));
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)(targetEdge.getPartitioner() instanceof RescalePartitioner)).isTrue();
        frozenNodeToStartNodeMap.put((Integer)streamGraph.getSourceIDs().iterator().next(), (Integer)streamGraph.getSourceIDs().iterator().next());
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(streamEdgeUpdateRequestInfo))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)(targetEdge.getPartitioner() instanceof RescalePartitioner)).isTrue();
    }

    @Test
    void testModifyIntraInputKeyCorrelation() {
        StreamGraph streamGraph = this.createStreamGraphWithCorrelatedInputs();
        DefaultStreamGraphContext streamGraphContext = new DefaultStreamGraphContext(streamGraph, new HashMap(), new HashMap(), new HashMap(), new HashMap(), new HashSet(), Thread.currentThread().getContextClassLoader());
        StreamNode sourceNode = streamGraph.getStreamNode((Integer)streamGraph.getSourceIDs().iterator().next());
        StreamEdge targetEdge = (StreamEdge)sourceNode.getOutEdges().get(0);
        AssertionsForClassTypes.assertThat((boolean)targetEdge.areInterInputsKeysCorrelated()).isTrue();
        AssertionsForClassTypes.assertThat((boolean)targetEdge.isIntraInputKeyCorrelated()).isTrue();
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(new StreamEdgeUpdateRequestInfo(targetEdge.getEdgeId(), Integer.valueOf(targetEdge.getSourceId()), Integer.valueOf(targetEdge.getTargetId())).withIntraInputKeyCorrelated(false)))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)targetEdge.isIntraInputKeyCorrelated()).isFalse();
        AssertionsForClassTypes.assertThat((boolean)streamGraphContext.modifyStreamEdge(Collections.singletonList(new StreamEdgeUpdateRequestInfo(targetEdge.getEdgeId(), Integer.valueOf(targetEdge.getSourceId()), Integer.valueOf(targetEdge.getTargetId())).withIntraInputKeyCorrelated(true)))).isTrue();
        AssertionsForClassTypes.assertThat((boolean)targetEdge.isIntraInputKeyCorrelated()).isTrue();
    }

    private StreamGraph createStreamGraphForModifyStreamEdgeTest() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource sourceDataStream = env.fromData((Object[])new Integer[]{1, 2, 3}).setParallelism(1);
        DataStream partitionAfterSourceDataStream = new DataStream(env, (Transformation)new PartitionTransformation(sourceDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.PIPELINED));
        SingleOutputStreamOperator mapDataStream = partitionAfterSourceDataStream.map((MapFunction & Serializable)value -> value).setParallelism(2);
        DataStream partitionAfterMapDataStream = new DataStream(env, (Transformation)new PartitionTransformation(mapDataStream.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.PIPELINED));
        partitionAfterMapDataStream.print();
        return env.getStreamGraph();
    }

    private StreamGraph createStreamGraphWithCorrelatedInputs() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        KeyedStream streamA = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)"a1"), new Tuple2((Object)2, (Object)"a2"), new Tuple2((Object)3, (Object)"a3")}).keyBy((KeySelector & Serializable)value -> (Integer)value.f0);
        KeyedStream streamB = env.fromData((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)"b1"), new Tuple2((Object)2, (Object)"b2"), new Tuple2((Object)3, (Object)"b3")}).keyBy((KeySelector & Serializable)value -> (Integer)value.f0);
        SingleOutputStreamOperator joinedStream = streamA.join((DataStream)streamB).where((KeySelector & Serializable)v -> (Integer)v.f0).equalTo((KeySelector & Serializable)v -> (Integer)v.f0).window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofMillis(1L))).apply((JoinFunction & Serializable)(first, second) -> (String)first.f1 + (String)second.f1, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        joinedStream.print();
        return env.getStreamGraph();
    }
}

