package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/OperatorStateRestoreOperation.class */
public class OperatorStateRestoreOperation implements RestoreOperation<Void> {
    private final CloseableRegistry closeStreamOnCancelRegistry;
    private final ClassLoader userClassloader;
    private final Map<String, PartitionableListState<?>> registeredOperatorStates;
    private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
    private final Collection<OperatorStateHandle> stateHandles;

    public OperatorStateRestoreOperation(CloseableRegistry closeableRegistry, ClassLoader classLoader, Map<String, PartitionableListState<?>> map, Map<String, BackendWritableBroadcastState<?, ?>> map2, @Nonnull Collection<OperatorStateHandle> collection) {
        this.closeStreamOnCancelRegistry = closeableRegistry;
        this.userClassloader = classLoader;
        this.registeredOperatorStates = map;
        this.registeredBroadcastStates = map2;
        this.stateHandles = collection;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.state.RestoreOperation
    public Void restore() throws Exception {
        if (this.stateHandles.isEmpty()) {
            return null;
        }
        for (OperatorStateHandle operatorStateHandle : this.stateHandles) {
            if (operatorStateHandle != null) {
                FSDataInputStream openInputStream = operatorStateHandle.openInputStream();
                this.closeStreamOnCancelRegistry.registerCloseable(openInputStream);
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                try {
                    Thread.currentThread().setContextClassLoader(this.userClassloader);
                    OperatorBackendSerializationProxy operatorBackendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                    operatorBackendSerializationProxy.read(new DataInputViewStreamWrapper(openInputStream));
                    for (StateMetaInfoSnapshot stateMetaInfoSnapshot : operatorBackendSerializationProxy.getOperatorStateMetaInfoSnapshots()) {
                        RegisteredOperatorStateBackendMetaInfo registeredOperatorStateBackendMetaInfo = new RegisteredOperatorStateBackendMetaInfo(stateMetaInfoSnapshot);
                        if (registeredOperatorStateBackendMetaInfo.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
                            throw new IOException("Unable to restore operator state [" + stateMetaInfoSnapshot.getName() + "]. The previous typeSerializer of the operator state must be present; the typeSerializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        if (null == this.registeredOperatorStates.get(stateMetaInfoSnapshot.getName())) {
                            PartitionableListState<?> partitionableListState = new PartitionableListState<>((RegisteredOperatorStateBackendMetaInfo<?>) registeredOperatorStateBackendMetaInfo);
                            this.registeredOperatorStates.put(partitionableListState.getStateMetaInfo().getName(), partitionableListState);
                        }
                    }
                    for (StateMetaInfoSnapshot stateMetaInfoSnapshot2 : operatorBackendSerializationProxy.getBroadcastStateMetaInfoSnapshots()) {
                        RegisteredBroadcastStateBackendMetaInfo registeredBroadcastStateBackendMetaInfo = new RegisteredBroadcastStateBackendMetaInfo(stateMetaInfoSnapshot2);
                        if ((registeredBroadcastStateBackendMetaInfo.getKeySerializer() instanceof UnloadableDummyTypeSerializer) || (registeredBroadcastStateBackendMetaInfo.getValueSerializer() instanceof UnloadableDummyTypeSerializer)) {
                            throw new IOException("Unable to restore broadcast state [" + stateMetaInfoSnapshot2.getName() + "]. The previous key and value serializers of the state must be present; the serializers could have been removed from the classpath, or their implementations have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                        }
                        if (this.registeredBroadcastStates.get(stateMetaInfoSnapshot2.getName()) == null) {
                            HeapBroadcastState heapBroadcastState = new HeapBroadcastState(registeredBroadcastStateBackendMetaInfo);
                            this.registeredBroadcastStates.put(heapBroadcastState.getStateMetaInfo().getName(), heapBroadcastState);
                        }
                    }
                    for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> entry : operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
                        String key = entry.getKey();
                        PartitionableListState<?> partitionableListState2 = this.registeredOperatorStates.get(key);
                        if (partitionableListState2 == null) {
                            BackendWritableBroadcastState<?, ?> backendWritableBroadcastState = this.registeredBroadcastStates.get(key);
                            Preconditions.checkState(backendWritableBroadcastState != null, "Found state without corresponding meta info: " + key);
                            deserializeBroadcastStateValues(backendWritableBroadcastState, openInputStream, entry.getValue());
                        } else {
                            deserializeOperatorStateValues(partitionableListState2, openInputStream, entry.getValue());
                        }
                    }
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    if (this.closeStreamOnCancelRegistry.unregisterCloseable(openInputStream)) {
                        IOUtils.closeQuietly((InputStream) openInputStream);
                    }
                    throw th;
                }
            }
        }
        return null;
    }

    private <S> void deserializeOperatorStateValues(PartitionableListState<S> partitionableListState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws IOException {
        long[] offsets;
        if (null == stateMetaInfo || null == (offsets = stateMetaInfo.getOffsets())) {
            return;
        }
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        TypeSerializer<S> partitionStateSerializer = partitionableListState.getStateMetaInfo().getPartitionStateSerializer();
        for (long j : offsets) {
            fSDataInputStream.seek(j);
            partitionableListState.add(partitionStateSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }

    private <K, V> void deserializeBroadcastStateValues(BackendWritableBroadcastState<K, V> backendWritableBroadcastState, FSDataInputStream fSDataInputStream, OperatorStateHandle.StateMetaInfo stateMetaInfo) throws Exception {
        long[] offsets;
        if (stateMetaInfo == null || (offsets = stateMetaInfo.getOffsets()) == null) {
            return;
        }
        TypeSerializer<K> keySerializer = backendWritableBroadcastState.getStateMetaInfo().getKeySerializer();
        TypeSerializer<V> valueSerializer = backendWritableBroadcastState.getStateMetaInfo().getValueSerializer();
        fSDataInputStream.seek(offsets[0]);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(fSDataInputStream);
        int readInt = dataInputViewStreamWrapper.readInt();
        for (int i = 0; i < readInt; i++) {
            backendWritableBroadcastState.put(keySerializer.deserialize(dataInputViewStreamWrapper), valueSerializer.deserialize(dataInputViewStreamWrapper));
        }
    }
}
