/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.util;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.CombineWithContext;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
import com.google.cloud.dataflow.sdk.util.ReduceFn;
import com.google.cloud.dataflow.sdk.util.state.BagState;
import com.google.cloud.dataflow.sdk.util.state.CombiningState;
import com.google.cloud.dataflow.sdk.util.state.MergingStateAccessor;
import com.google.cloud.dataflow.sdk.util.state.ReadableState;
import com.google.cloud.dataflow.sdk.util.state.StateAccessor;
import com.google.cloud.dataflow.sdk.util.state.StateMerging;
import com.google.cloud.dataflow.sdk.util.state.StateTag;
import com.google.cloud.dataflow.sdk.util.state.StateTags;

public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow>
extends ReduceFn<K, InputT, OutputT, W> {
    private static final String BUFFER_NAME = "buf";
    private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag;

    public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W> buffering(Coder<T> inputCoder) {
        final StateTag<Object, BagState<T>> bufferTag = StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder));
        return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag){

            @Override
            public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
                StateMerging.prefetchBags(state, bufferTag);
            }

            @Override
            public void onMerge(ReduceFn.OnMergeContext c) throws Exception {
                StateMerging.mergeBags(c.state(), bufferTag);
            }
        };
    }

    public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT, AccumT, OutputT, W> combining(Coder<K> keyCoder, AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) {
        final StateTag bufferTag = combineFn.getFn() instanceof CombineWithContext.KeyedCombineFnWithContext ? StateTags.makeSystemTagInternal(StateTags.keyedCombiningValueWithContext(BUFFER_NAME, combineFn.getAccumulatorCoder(), (CombineWithContext.KeyedCombineFnWithContext)combineFn.getFn())) : StateTags.makeSystemTagInternal(StateTags.keyedCombiningValue(BUFFER_NAME, combineFn.getAccumulatorCoder(), (Combine.KeyedCombineFn)combineFn.getFn()));
        return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag){

            @Override
            public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception {
                StateMerging.prefetchCombiningValues(state, bufferTag);
            }

            @Override
            public void onMerge(ReduceFn.OnMergeContext c) throws Exception {
                StateMerging.mergeCombiningValues(c.state(), bufferTag);
            }
        };
    }

    public SystemReduceFn(StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) {
        this.bufferTag = bufferTag;
    }

    @Override
    public void processValue(ReduceFn.ProcessValueContext c) throws Exception {
        c.state().access(this.bufferTag).add(c.value());
    }

    @Override
    public void prefetchOnTrigger(StateAccessor<K> state) {
        state.access(this.bufferTag).readLater();
    }

    @Override
    public void onTrigger(ReduceFn.OnTriggerContext c) throws Exception {
        c.output(c.state().access(this.bufferTag).read());
    }

    @Override
    public void clearState(ReduceFn.Context c) throws Exception {
        c.state().access(this.bufferTag).clear();
    }

    @Override
    public ReadableState<Boolean> isEmpty(StateAccessor<K> state) {
        return state.access(this.bufferTag).isEmpty();
    }
}

