/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.testing;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DurationCoder;
import com.google.cloud.dataflow.sdk.coders.InstantCoder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableCollection;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.testing.AutoValue_TestStream_ElementEvent;
import com.google.cloud.dataflow.sdk.testing.AutoValue_TestStream_ProcessingTimeEvent;
import com.google.cloud.dataflow.sdk.testing.AutoValue_TestStream_WatermarkEvent;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.VarInt;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TimestampedValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Collections;
import java.util.List;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public final class TestStream<T>
extends PTransform<PBegin, PCollection<T>> {
    private final List<Event<T>> events;
    private final Coder<T> coder;

    public static <T> Builder<T> create(Coder<T> coder) {
        return new Builder(coder);
    }

    private TestStream(Coder<T> coder, List<Event<T>> events) {
        this.coder = coder;
        this.events = Preconditions.checkNotNull(events);
    }

    @Override
    public PCollection<T> apply(PBegin input) {
        throw new IllegalStateException(String.format("Pipeline Runner %s does not support %s", input.getPipeline().getRunner().getClass().getSimpleName(), this.getClass().getSimpleName()));
    }

    public Coder<T> getValueCoder() {
        return this.coder;
    }

    public Coder<Event<T>> getEventCoder() {
        return EventCoder.of(this.coder);
    }

    public List<Event<T>> getEvents() {
        return this.events;
    }

    @VisibleForTesting
    static final class EventCoder<T>
    extends StandardCoder<Event<T>> {
        private static final Coder<ReadableDuration> DURATION_CODER = DurationCoder.of();
        private static final Coder<Instant> INSTANT_CODER = InstantCoder.of();
        private final Coder<T> valueCoder;
        private final Coder<Iterable<TimestampedValue<T>>> elementCoder;

        public static <T> EventCoder<T> of(Coder<T> valueCoder) {
            return new EventCoder<T>(valueCoder);
        }

        @JsonCreator
        public static <T> EventCoder<T> of(@JsonProperty(value="component_encodings") List<? extends Coder<?>> components) {
            Preconditions.checkArgument(components.size() == 1, "Was expecting exactly one component coder, got %s", components.size());
            return new EventCoder(components.get(0));
        }

        private EventCoder(Coder<T> valueCoder) {
            this.valueCoder = valueCoder;
            this.elementCoder = IterableCoder.of(TimestampedValue.TimestampedValueCoder.of(valueCoder));
        }

        @Override
        public void encode(Event<T> value, OutputStream outStream, Coder.Context context) throws IOException {
            VarInt.encode(value.getType().ordinal(), outStream);
            switch (value.getType()) {
                case ELEMENT: {
                    Iterable elems = ((ElementEvent)value).getElements();
                    this.elementCoder.encode(elems, outStream, context);
                    break;
                }
                case WATERMARK: {
                    Instant ts = ((WatermarkEvent)value).getWatermark();
                    INSTANT_CODER.encode(ts, outStream, context);
                    break;
                }
                case PROCESSING_TIME: {
                    Duration processingAdvance = ((ProcessingTimeEvent)value).getProcessingTimeAdvance();
                    DURATION_CODER.encode((ReadableDuration)processingAdvance, outStream, context);
                    break;
                }
                default: {
                    throw new AssertionError((Object)("Unreachable: Unsupported Event Type " + (Object)((Object)value.getType())));
                }
            }
        }

        @Override
        public Event<T> decode(InputStream inStream, Coder.Context context) throws IOException {
            EventType eventType = EventType.values()[VarInt.decodeInt(inStream)];
            switch (eventType) {
                case ELEMENT: {
                    Iterable<TimestampedValue<T>> elements = this.elementCoder.decode(inStream, context);
                    return ElementEvent.add(elements);
                }
                case WATERMARK: {
                    return WatermarkEvent.advanceTo(INSTANT_CODER.decode(inStream, context));
                }
                case PROCESSING_TIME: {
                    return ProcessingTimeEvent.advanceBy(DURATION_CODER.decode(inStream, context).toDuration());
                }
            }
            throw new AssertionError((Object)("Unreachable: Unsupported Event Type " + (Object)((Object)eventType)));
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.singletonList(this.valueCoder);
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.elementCoder.verifyDeterministic();
            DURATION_CODER.verifyDeterministic();
            INSTANT_CODER.verifyDeterministic();
        }
    }

    public static abstract class ProcessingTimeEvent<T>
    implements Event<T> {
        public abstract Duration getProcessingTimeAdvance();

        static <T> Event<T> advanceBy(Duration amount) {
            return new AutoValue_TestStream_ProcessingTimeEvent(EventType.PROCESSING_TIME, amount);
        }
    }

    public static abstract class WatermarkEvent<T>
    implements Event<T> {
        public abstract Instant getWatermark();

        static <T> Event<T> advanceTo(Instant newWatermark) {
            return new AutoValue_TestStream_WatermarkEvent(EventType.WATERMARK, newWatermark);
        }
    }

    public static abstract class ElementEvent<T>
    implements Event<T> {
        public abstract Iterable<TimestampedValue<T>> getElements();

        @SafeVarargs
        static <T> Event<T> add(TimestampedValue<T> element, TimestampedValue<T> ... elements) {
            return ElementEvent.add(((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().add(element)).add(elements)).build());
        }

        static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
            return new AutoValue_TestStream_ElementEvent<T>(EventType.ELEMENT, elements);
        }
    }

    public static enum EventType {
        ELEMENT,
        WATERMARK,
        PROCESSING_TIME;

    }

    public static interface Event<T> {
        public EventType getType();
    }

    public static class Builder<T> {
        private final Coder<T> coder;
        private final ImmutableList<Event<T>> events;
        private final Instant currentWatermark;

        private Builder(Coder<T> coder) {
            this(coder, ImmutableList.of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
        }

        private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {
            this.coder = coder;
            this.events = events;
            this.currentWatermark = currentWatermark;
        }

        @SafeVarargs
        public final Builder<T> addElements(T first, T ... rest) {
            TimestampedValue<T> firstElement = TimestampedValue.of(first, this.currentWatermark);
            TimestampedValue[] remainingElements = new TimestampedValue[rest.length];
            for (int i = 0; i < rest.length; ++i) {
                remainingElements[i] = TimestampedValue.of(rest[i], this.currentWatermark);
            }
            return this.addElements(firstElement, remainingElements);
        }

        @SafeVarargs
        public final Builder<T> addElements(TimestampedValue<T> first, TimestampedValue<T> ... rest) {
            Preconditions.checkArgument(first.getTimestamp().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, first.getTimestamp());
            for (TimestampedValue<T> element : rest) {
                Preconditions.checkArgument(element.getTimestamp().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "Elements must have timestamps before %s. Got: %s", BoundedWindow.TIMESTAMP_MAX_VALUE, element.getTimestamp());
            }
            ImmutableCollection newEvents = ((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().addAll(this.events)).add(ElementEvent.add(first, rest))).build();
            return new Builder<T>(this.coder, newEvents, this.currentWatermark);
        }

        public Builder<T> advanceWatermarkTo(Instant newWatermark) {
            Preconditions.checkArgument(newWatermark.isAfter((ReadableInstant)this.currentWatermark), "The watermark must monotonically advance");
            Preconditions.checkArgument(newWatermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), "The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", newWatermark, BoundedWindow.TIMESTAMP_MAX_VALUE);
            ImmutableCollection newEvents = ((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().addAll(this.events)).add(WatermarkEvent.advanceTo(newWatermark))).build();
            return new Builder<T>(this.coder, newEvents, newWatermark);
        }

        public Builder<T> advanceProcessingTime(Duration amount) {
            Preconditions.checkArgument(amount.getMillis() > 0L, "Must advance the processing time by a positive amount. Got: ", amount);
            ImmutableCollection newEvents = ((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().addAll(this.events)).add(ProcessingTimeEvent.advanceBy(amount))).build();
            return new Builder<T>(this.coder, newEvents, this.currentWatermark);
        }

        public TestStream<T> advanceWatermarkToInfinity() {
            ImmutableCollection newEvents = ((ImmutableList.Builder)((ImmutableList.Builder)ImmutableList.builder().addAll(this.events)).add(WatermarkEvent.advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))).build();
            return new TestStream(this.coder, (List)((Object)newEvents));
        }
    }
}

