package org.apache.flink.runtime.checkpoint.channel;

import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.InputChannelStateHandle;
import org.apache.flink.runtime.state.ResultSubpartitionStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/channel/SequentialChannelStateReaderImplTest.class */
public class SequentialChannelStateReaderImplTest {
    private final ChannelStateSerializer serializer = new ChannelStateSerializerImpl();
    private final Random random = new Random();
    private final int parLevel;
    private final int statePartsPerChannel;
    private final int stateBytesPerPart;
    private final int bufferSize;
    private final int stateParLevel;
    private final int buffersPerChannel;

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @Parameterized.Parameters(name = "{0}: stateParLevel={1}, statePartsPerChannel={2}, stateBytesPerPart={3},  parLevel={4}, bufferSize={5}")
    public static Object[][] parameters() {
        return new Object[]{new Object[]{"NoStateAndNoChannels", 0, 0, 0, 0, 0}, new Object[]{"NoState", 0, 10, 10, 10, 10}, new Object[]{"ReadPermutedStateWithEqualBuffer", 10, 10, 10, 10, 10}, new Object[]{"ReadPermutedStateWithReducedBuffer", 10, 10, 10, 20, 10}, new Object[]{"ReadPermutedStateWithIncreasedBuffer", 10, 10, 10, 10, 20}};
    }

    public SequentialChannelStateReaderImplTest(String str, int i, int i2, int i3, int i4, int i5) {
        this.parLevel = i4;
        this.statePartsPerChannel = i2;
        this.stateBytesPerPart = i3;
        this.bufferSize = i5;
        this.stateParLevel = i;
        this.buffersPerChannel = Math.max(1, i2 * (i5 >= i3 ? 1 : i3 / i5));
    }

    @Test
    public void testReadPermutedState() throws Exception {
        Map<InputChannelInfo, List<byte[]>> generateState = generateState((v1, v2) -> {
            return new InputChannelInfo(v1, v2);
        });
        Map<ResultSubpartitionInfo, List<byte[]>> generateState2 = generateState((v1, v2) -> {
            return new ResultSubpartitionInfo(v1, v2);
        });
        SequentialChannelStateReaderImpl sequentialChannelStateReaderImpl = new SequentialChannelStateReaderImpl(buildSnapshot(writePermuted(generateState, generateState2)));
        withResultPartitions(bufferWritingResultPartitionArr -> {
            sequentialChannelStateReaderImpl.readOutputData(bufferWritingResultPartitionArr, false);
            assertBuffersEquals(generateState2, collectBuffers(bufferWritingResultPartitionArr));
        });
        withInputGates(inputGateArr -> {
            sequentialChannelStateReaderImpl.readInputData(inputGateArr);
            assertBuffersEquals(generateState, collectBuffers(inputGateArr));
            assertConsumed(inputGateArr);
        });
    }

    private Map<ResultSubpartitionInfo, List<Buffer>> collectBuffers(BufferWritingResultPartition[] bufferWritingResultPartitionArr) throws IOException {
        HashMap hashMap = new HashMap();
        for (BufferWritingResultPartition bufferWritingResultPartition : bufferWritingResultPartitionArr) {
            for (int i = 0; i < bufferWritingResultPartition.getNumberOfSubpartitions(); i++) {
                ResultSubpartitionInfo subpartitionInfo = bufferWritingResultPartition.getAllPartitions()[i].getSubpartitionInfo();
                ResultSubpartitionView createSubpartitionView = bufferWritingResultPartition.createSubpartitionView(subpartitionInfo.getSubPartitionIdx(), new NoOpBufferAvailablityListener());
                ResultSubpartition.BufferAndBacklog nextBuffer = createSubpartitionView.getNextBuffer();
                while (true) {
                    ResultSubpartition.BufferAndBacklog bufferAndBacklog = nextBuffer;
                    if (bufferAndBacklog != null) {
                        if (bufferAndBacklog.buffer().isBuffer()) {
                            ((List) hashMap.computeIfAbsent(subpartitionInfo, resultSubpartitionInfo -> {
                                return new ArrayList();
                            })).add(bufferAndBacklog.buffer());
                        }
                        nextBuffer = createSubpartitionView.getNextBuffer();
                    }
                }
            }
        }
        return hashMap;
    }

    private Map<InputChannelInfo, List<Buffer>> collectBuffers(InputGate[] inputGateArr) throws Exception {
        HashMap hashMap = new HashMap();
        for (InputGate inputGate : inputGateArr) {
            Optional pollNext = inputGate.pollNext();
            while (true) {
                Optional optional = pollNext;
                if (optional.isPresent()) {
                    if (((BufferOrEvent) optional.get()).isBuffer()) {
                        ((List) hashMap.computeIfAbsent(((BufferOrEvent) optional.get()).getChannelInfo(), inputChannelInfo -> {
                            return new ArrayList();
                        })).add(((BufferOrEvent) optional.get()).getBuffer());
                    }
                    pollNext = inputGate.pollNext();
                }
            }
        }
        return hashMap;
    }

    private void assertConsumed(InputGate[] inputGateArr) throws InterruptedException, ExecutionException {
        for (InputGate inputGate : inputGateArr) {
            Assert.assertTrue(inputGate.getStateConsumedFuture().isDone());
            inputGate.getStateConsumedFuture().get();
        }
    }

    private void withInputGates(ThrowingConsumer<InputGate[], Exception> throwingConsumer) throws Exception {
        SingleInputGate[] singleInputGateArr = new SingleInputGate[this.parLevel];
        int i = this.parLevel + (this.parLevel * this.parLevel * this.buffersPerChannel);
        MemorySegmentProvider networkBufferPool = new NetworkBufferPool(i, this.bufferSize);
        Closer create = Closer.create();
        Throwable th = null;
        try {
            networkBufferPool.getClass();
            create.register(networkBufferPool::destroy);
            networkBufferPool.getClass();
            create.register(networkBufferPool::destroyAllBufferPools);
            Closer create2 = Closer.create();
            Throwable th2 = null;
            for (int i2 = 0; i2 < this.parLevel; i2++) {
                try {
                    try {
                        singleInputGateArr[i2] = new SingleInputGateBuilder().setNumberOfChannels(this.parLevel).setSingleInputGateIndex(i2).setBufferPoolFactory(networkBufferPool.createBufferPool(1, this.buffersPerChannel)).setSegmentProvider(networkBufferPool).setChannelFactory((inputChannelBuilder, singleInputGate) -> {
                            return inputChannelBuilder.setNetworkBuffersPerChannel(this.buffersPerChannel).buildRemoteRecoveredChannel(singleInputGate);
                        }).build();
                        singleInputGateArr[i2].setup();
                        SingleInputGate singleInputGate2 = singleInputGateArr[i2];
                        singleInputGate2.getClass();
                        create2.register(singleInputGate2::close);
                    } catch (Throwable th3) {
                        th2 = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (create2 != null) {
                        if (th2 != null) {
                            try {
                                create2.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            create2.close();
                        }
                    }
                    throw th4;
                }
            }
            throwingConsumer.accept(singleInputGateArr);
            if (create2 != null) {
                if (0 != 0) {
                    try {
                        create2.close();
                    } catch (Throwable th6) {
                        th2.addSuppressed(th6);
                    }
                } else {
                    create2.close();
                }
            }
            Assert.assertEquals(i, networkBufferPool.getNumberOfAvailableMemorySegments());
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    create.close();
                }
            }
            throw th8;
        }
    }

    private void withResultPartitions(ThrowingConsumer<BufferWritingResultPartition[], Exception> throwingConsumer) throws Exception {
        int i = this.parLevel * this.parLevel * this.buffersPerChannel;
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(i, this.bufferSize);
        ResultPartition[] resultPartitionArr = (BufferWritingResultPartition[]) IntStream.range(0, this.parLevel).mapToObj(i2 -> {
            return new ResultPartitionBuilder().setResultPartitionIndex(i2).setNumberOfSubpartitions(this.parLevel).setNetworkBufferPool(networkBufferPool).build();
        }).toArray(i3 -> {
            return new BufferWritingResultPartition[i3];
        });
        try {
            for (ResultPartition resultPartition : resultPartitionArr) {
                resultPartition.setup();
            }
            throwingConsumer.accept(resultPartitionArr);
            for (ResultPartition resultPartition2 : resultPartitionArr) {
                resultPartition2.close();
            }
            try {
                Assert.assertEquals(i, networkBufferPool.getNumberOfAvailableMemorySegments());
                networkBufferPool.destroyAllBufferPools();
                networkBufferPool.destroy();
            } finally {
            }
        } catch (Throwable th) {
            for (ResultPartition resultPartition3 : resultPartitionArr) {
                resultPartition3.close();
            }
            try {
                Assert.assertEquals(i, networkBufferPool.getNumberOfAvailableMemorySegments());
                networkBufferPool.destroyAllBufferPools();
                networkBufferPool.destroy();
                throw th;
            } finally {
            }
        }
    }

    private TaskStateSnapshot buildSnapshot(Tuple2<List<InputChannelStateHandle>, List<ResultSubpartitionStateHandle>> tuple2) {
        return new TaskStateSnapshot(Collections.singletonMap(new OperatorID(), OperatorSubtaskState.builder().setInputChannelState(new StateObjectCollection((Collection) tuple2.f0)).setResultSubpartitionState(new StateObjectCollection((Collection) tuple2.f1)).build()));
    }

    private <T> Map<T, List<byte[]>> generateState(BiFunction<Integer, Integer, T> biFunction) {
        return (Map) IntStream.range(0, this.stateParLevel).boxed().flatMap(num -> {
            return IntStream.range(0, this.stateParLevel).mapToObj(i -> {
                return biFunction.apply(num, Integer.valueOf(i));
            });
        }).collect(Collectors.toMap(Function.identity(), this::generateSingleChannelState));
    }

    private List<byte[]> generateSingleChannelState(Object obj) {
        return (List) IntStream.range(0, this.statePartsPerChannel).mapToObj(i -> {
            return randomStateBytes();
        }).collect(Collectors.toList());
    }

    private Tuple2<List<InputChannelStateHandle>, List<ResultSubpartitionStateHandle>> writePermuted(Map<InputChannelInfo, List<byte[]>> map, Map<ResultSubpartitionInfo, List<byte[]>> map2) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            try {
                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                this.serializer.writeHeader(dataOutputStream);
                Map write = write(dataOutputStream, permute(map));
                Map write2 = write(dataOutputStream, permute(map2));
                ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle("", byteArrayOutputStream.toByteArray());
                Tuple2<List<InputChannelStateHandle>, List<ResultSubpartitionStateHandle>> of = Tuple2.of(write.entrySet().stream().map(entry -> {
                    return new InputChannelStateHandle((InputChannelInfo) entry.getKey(), byteStreamStateHandle, (List) entry.getValue());
                }).collect(Collectors.toList()), write2.entrySet().stream().map(entry2 -> {
                    return new ResultSubpartitionStateHandle((ResultSubpartitionInfo) entry2.getKey(), byteStreamStateHandle, (List) entry2.getValue());
                }).collect(Collectors.toList()));
                if (byteArrayOutputStream != null) {
                    if (0 != 0) {
                        try {
                            byteArrayOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        byteArrayOutputStream.close();
                    }
                }
                return of;
            } finally {
            }
        } catch (Throwable th3) {
            if (byteArrayOutputStream != null) {
                if (th != null) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
            throw th3;
        }
    }

    private <T> List<Tuple2<byte[], T>> permute(Map<T, List<byte[]>> map) {
        ArrayList arrayList = new ArrayList(map.entrySet());
        Collections.shuffle(arrayList);
        return (List) arrayList.stream().flatMap(entry -> {
            return ((List) entry.getValue()).stream().map(bArr -> {
                return Tuple2.of(bArr, entry.getKey());
            });
        }).collect(Collectors.toList());
    }

    private <T> Map<T, List<Long>> write(DataOutputStream dataOutputStream, List<Tuple2<byte[], T>> list) throws IOException {
        HashMap hashMap = new HashMap();
        for (Tuple2<byte[], T> tuple2 : list) {
            ((List) hashMap.computeIfAbsent(tuple2.f1, obj -> {
                return new ArrayList();
            })).add(Long.valueOf(dataOutputStream.size()));
            Buffer buffer = null;
            try {
                buffer = wrap((byte[]) tuple2.f0);
                this.serializer.writeData(dataOutputStream, new Buffer[]{buffer});
                if (buffer != null) {
                    buffer.recycleBuffer();
                }
            } catch (Throwable th) {
                if (buffer != null) {
                    buffer.recycleBuffer();
                }
                throw th;
            }
        }
        return hashMap;
    }

    private NetworkBuffer wrap(byte[] bArr) {
        return new NetworkBuffer(MemorySegmentFactory.wrap(bArr), FreeingBufferRecycler.INSTANCE, Buffer.DataType.DATA_BUFFER, bArr.length);
    }

    private byte[] randomStateBytes() {
        byte[] bArr = new byte[this.stateBytesPerPart];
        this.random.nextBytes(bArr);
        return bArr;
    }

    private <T> void assertBuffersEquals(Map<T, List<byte[]>> map, Map<T, List<Buffer>> map2) {
        try {
            Assert.assertEquals(mapValues(map, this::concat), mapValues(map2, list -> {
                return concat(toBytes(list));
            }));
        } finally {
            map2.values().stream().flatMap((v0) -> {
                return v0.stream();
            }).forEach((v0) -> {
                v0.recycleBuffer();
            });
        }
    }

    private static <K, V1, V2> Map<K, V2> mapValues(Map<K, V1> map, Function<V1, V2> function) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return function.apply(entry.getValue());
        }));
    }

    private NetworkBuffer concat(List<byte[]> list) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            Throwable th = null;
            try {
                try {
                    Iterator<byte[]> it = list.iterator();
                    while (it.hasNext()) {
                        byteArrayOutputStream.write(it.next());
                    }
                    NetworkBuffer wrap = wrap(byteArrayOutputStream.toByteArray());
                    if (byteArrayOutputStream != null) {
                        if (0 != 0) {
                            try {
                                byteArrayOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            byteArrayOutputStream.close();
                        }
                    }
                    return wrap;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private List<byte[]> toBytes(List<Buffer> list) {
        return (List) list.stream().map(buffer -> {
            byte[] bArr = new byte[buffer.getSize()];
            buffer.getNioBuffer(0, buffer.getSize()).get(bArr, 0, bArr.length);
            return bArr;
        }).collect(Collectors.toList());
    }
}
