/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.changelog;

import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.state.changelog.AbstractChangelogStateBackend;
import org.apache.flink.state.changelog.ChangelogKeyedStateBackend;
import org.apache.flink.state.changelog.ChangelogStateBackendMetricGroup;
import org.apache.flink.state.changelog.ChangelogStateFactory;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.state.common.ChangelogMaterializationMetricGroup;
import org.apache.flink.state.common.PeriodicMaterializationManager;
import org.apache.flink.util.Preconditions;

@Internal
public class ChangelogStateBackend
extends AbstractChangelogStateBackend
implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1000L;

    ChangelogStateBackend(StateBackend stateBackend) {
        super(stateBackend);
    }

    public StateBackend configure(ReadableConfig config, ClassLoader classLoader) throws IllegalConfigurationException {
        if (this.delegatedStateBackend instanceof ConfigurableStateBackend) {
            return new ChangelogStateBackend(((ConfigurableStateBackend)this.delegatedStateBackend).configure(config, classLoader));
        }
        return this;
    }

    @Override
    protected <K> CheckpointableKeyedStateBackend<K> restore(Environment env, String operatorIdentifier, KeyGroupRange keyGroupRange, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<ChangelogStateBackendHandle> stateBackendHandles, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> baseBackendBuilder) throws Exception {
        StateChangelogStorage changelogStorage = (StateChangelogStorage)Preconditions.checkNotNull((Object)env.getTaskStateManager().getStateChangelogStorage(), (String)"Changelog storage is null when creating and restoring the ChangelogKeyedStateBackend.");
        String subtaskName = env.getTaskInfo().getTaskNameWithSubtasks();
        ExecutionConfig executionConfig = env.getExecutionConfig();
        ChangelogStateFactory changelogStateFactory = new ChangelogStateFactory();
        CheckpointableKeyedStateBackend<K> keyedStateBackend = ChangelogBackendRestoreOperation.restore(env.getTaskManagerInfo().getConfiguration(), env.getUserCodeClassLoader().asClassLoader(), env.getTaskStateManager(), stateBackendHandles, baseBackendBuilder, (baseBackend, baseState) -> new ChangelogKeyedStateBackend(baseBackend, subtaskName, executionConfig, ttlTimeProvider, new ChangelogStateBackendMetricGroup(metricGroup), (StateChangelogWriter<? extends ChangelogStateHandle>)((StateChangelogWriter<ChangelogStateHandle>)((StateChangelogWriter<? extends ChangelogStateHandle>)changelogStorage.createWriter(operatorIdentifier, keyGroupRange, env.getMainMailboxExecutor()))), (Collection<ChangelogStateBackendHandle>)baseState, (CheckpointStorageWorkerView)env.getCheckpointStorageAccess(), changelogStateFactory).getChangelogRestoreTarget());
        ChangelogKeyedStateBackend changelogKeyedStateBackend = (ChangelogKeyedStateBackend)keyedStateBackend;
        PeriodicMaterializationManager periodicMaterializationManager = new PeriodicMaterializationManager((MailboxExecutor)Preconditions.checkNotNull((Object)env.getMainMailboxExecutor()), (ExecutorService)Preconditions.checkNotNull((Object)env.getAsyncOperationsThreadPool()), subtaskName, (message, exception) -> env.failExternally((Throwable)new AsynchronousException(message, exception)), (PeriodicMaterializationManager.MaterializationTarget)changelogKeyedStateBackend, new ChangelogMaterializationMetricGroup(metricGroup), executionConfig.isPeriodicMaterializeEnabled(), executionConfig.getPeriodicMaterializeIntervalMillis(), executionConfig.getMaterializationMaxAllowedFailures(), operatorIdentifier);
        changelogKeyedStateBackend.registerCloseable((Closeable)periodicMaterializationManager);
        periodicMaterializationManager.start();
        return keyedStateBackend;
    }
}

