/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.runners.inprocess;

import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessEvaluationContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.PassthroughTransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluator;
import com.google.cloud.dataflow.sdk.runners.inprocess.TransformEvaluatorFactory;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.values.PCollection;
import java.util.Collection;
import javax.annotation.Nullable;
import org.joda.time.Instant;

class WindowEvaluatorFactory
implements TransformEvaluatorFactory {
    WindowEvaluatorFactory() {
    }

    @Override
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, @Nullable InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) throws Exception {
        return this.createTransformEvaluator(application, inputBundle, evaluationContext);
    }

    private <InputT> TransformEvaluator<InputT> createTransformEvaluator(AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, InProcessPipelineRunner.CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) {
        WindowFn<InputT, ?> fn = transform.getTransform().getWindowFn();
        InProcessPipelineRunner.UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle(inputBundle, transform.getOutput());
        if (fn == null) {
            return PassthroughTransformEvaluator.create(transform, outputBundle);
        }
        return new WindowIntoEvaluator<InputT>(transform, fn, outputBundle);
    }

    private static class InProcessAssignContext<InputT, W extends BoundedWindow>
    extends WindowFn.AssignContext {
        private final WindowedValue<InputT> value;

        public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) {
            this.value = value;
        }

        public InputT element() {
            return this.value.getValue();
        }

        @Override
        public Instant timestamp() {
            return this.value.getTimestamp();
        }

        @Override
        public Collection<? extends BoundedWindow> windows() {
            return this.value.getWindows();
        }
    }

    private static class WindowIntoEvaluator<InputT>
    implements TransformEvaluator<InputT> {
        private final AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform;
        private final WindowFn<InputT, ?> windowFn;
        private final InProcessPipelineRunner.UncommittedBundle<InputT> outputBundle;

        public WindowIntoEvaluator(AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>> transform, WindowFn<? super InputT, ?> windowFn, InProcessPipelineRunner.UncommittedBundle<InputT> outputBundle) {
            this.outputBundle = outputBundle;
            this.transform = transform;
            this.windowFn = windowFn;
        }

        @Override
        public void processElement(WindowedValue<InputT> element) throws Exception {
            Collection<BoundedWindow> windows = this.assignWindows(this.windowFn, element);
            this.outputBundle.add(WindowedValue.of(element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
        }

        private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception {
            InProcessAssignContext<InputT, W> assignContext = new InProcessAssignContext<InputT, W>(windowFn, element);
            Collection<W> windows = windowFn.assignWindows(assignContext);
            return windows;
        }

        @Override
        public InProcessTransformResult finishBundle() throws Exception {
            return StepTransformResult.withoutHold(this.transform).addOutput(this.outputBundle, new InProcessPipelineRunner.UncommittedBundle[0]).build();
        }
    }
}

