/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.api.client.util.BackOff;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.FluentBackoff;
import com.google.cloud.dataflow.sdk.util.StringUtils;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

class BoundedReadFromUnboundedSource<T>
extends PTransform<PInput, PCollection<T>> {
    private final UnboundedSource<T, ?> source;
    private final long maxNumRecords;
    private final Duration maxReadTime;
    private static final FluentBackoff BACKOFF_FACTORY = FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis((long)10L)).withMaxBackoff(Duration.standardSeconds((long)10L));

    public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long maxNumRecords) {
        return new BoundedReadFromUnboundedSource<T>(this.source, maxNumRecords, this.maxReadTime);
    }

    public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
        return new BoundedReadFromUnboundedSource<T>(this.source, this.maxNumRecords, maxReadTime);
    }

    BoundedReadFromUnboundedSource(UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) {
        this.source = source;
        this.maxNumRecords = maxNumRecords;
        this.maxReadTime = maxReadTime;
    }

    @Override
    public PCollection<T> apply(PInput input) {
        PCollection read = (PCollection)Pipeline.applyTransform(input, Read.from(new UnboundedToBoundedSourceAdapter(this.source, this.maxNumRecords, this.maxReadTime)));
        if (this.source.requiresDeduping()) {
            read = (PCollection)read.apply(RemoveDuplicates.withRepresentativeValueFn(new SerializableFunction<ValueWithRecordId<T>, byte[]>(){

                @Override
                public byte[] apply(ValueWithRecordId<T> input) {
                    return input.getId();
                }
            }));
        }
        return read.apply(ValueWithRecordId.stripIds());
    }

    @Override
    protected Coder<T> getDefaultOutputCoder() {
        return this.source.getDefaultOutputCoder();
    }

    @Override
    public String getKindString() {
        return "Read(" + StringUtils.approximateSimpleName(this.source.getClass()) + ")";
    }

    @Override
    public void populateDisplayData(DisplayData.Builder builder) {
        builder.add(DisplayData.item("source", this.source.getClass()).withLabel("Read Source")).addIfNotDefault(DisplayData.item("maxRecords", this.maxNumRecords).withLabel("Maximum Read Records"), Long.MAX_VALUE).addIfNotNull(DisplayData.item("maxReadTime", this.maxReadTime).withLabel("Maximum Read Time")).include(this.source);
    }

    private static class UnboundedToBoundedSourceAdapter<T>
    extends BoundedSource<ValueWithRecordId<T>> {
        private final UnboundedSource<T, ?> source;
        private final long maxNumRecords;
        private final Duration maxReadTime;

        private UnboundedToBoundedSourceAdapter(UnboundedSource<T, ?> source, long maxNumRecords, Duration maxReadTime) {
            this.source = source;
            this.maxNumRecords = maxNumRecords;
            this.maxReadTime = maxReadTime;
        }

        private static long[] splitNumRecords(long numRecords, int numSplits) {
            int i;
            long[] splitNumRecords = new long[numSplits];
            for (i = 0; i < numSplits; ++i) {
                splitNumRecords[i] = numRecords / (long)numSplits;
            }
            i = 0;
            while ((long)i < numRecords % (long)numSplits) {
                splitNumRecords[i] = splitNumRecords[i] + 1L;
                ++i;
            }
            return splitNumRecords;
        }

        private static int numInitialSplits(long numRecords) {
            int maxSplits = 100;
            long recordsPerSplit = 10000L;
            return (int)Math.min(100L, numRecords / 10000L + 1L);
        }

        @Override
        public List<? extends BoundedSource<ValueWithRecordId<T>>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            ArrayList<UnboundedToBoundedSourceAdapter<T>> result = new ArrayList<UnboundedToBoundedSourceAdapter<T>>();
            int numInitialSplits = UnboundedToBoundedSourceAdapter.numInitialSplits(this.maxNumRecords);
            List<UnboundedSource<T, ?>> splits = this.source.generateInitialSplits(numInitialSplits, options);
            int numSplits = splits.size();
            long[] numRecords = UnboundedToBoundedSourceAdapter.splitNumRecords(this.maxNumRecords, numSplits);
            for (int i = 0; i < numSplits; ++i) {
                result.add(new UnboundedToBoundedSourceAdapter<T>(splits.get(i), numRecords[i], this.maxReadTime));
            }
            return result;
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) {
            return 0L;
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) {
            return false;
        }

        @Override
        public Coder<ValueWithRecordId<T>> getDefaultOutputCoder() {
            return ValueWithRecordId.ValueWithRecordIdCoder.of(this.source.getDefaultOutputCoder());
        }

        @Override
        public void validate() {
            this.source.validate();
        }

        @Override
        public BoundedSource.BoundedReader<ValueWithRecordId<T>> createReader(PipelineOptions options) {
            return new Reader(this.source.createReader(options, null));
        }

        private class Reader
        extends BoundedSource.BoundedReader<ValueWithRecordId<T>> {
            private long recordsRead = 0L;
            private Instant endTime = Instant.now().plus((ReadableDuration)UnboundedToBoundedSourceAdapter.access$200(UnboundedToBoundedSourceAdapter.this));
            private UnboundedSource.UnboundedReader<T> reader;

            private Reader(UnboundedSource.UnboundedReader<T> reader) {
                this.endTime = UnboundedToBoundedSourceAdapter.this.maxReadTime != null ? Instant.now().plus((ReadableDuration)UnboundedToBoundedSourceAdapter.this.maxReadTime) : null;
                this.reader = reader;
            }

            @Override
            public boolean start() throws IOException {
                if (UnboundedToBoundedSourceAdapter.this.maxNumRecords <= 0L || UnboundedToBoundedSourceAdapter.this.maxReadTime != null && UnboundedToBoundedSourceAdapter.this.maxReadTime.getMillis() == 0L) {
                    return false;
                }
                ++this.recordsRead;
                if (this.reader.start()) {
                    return true;
                }
                return this.advanceWithBackoff();
            }

            @Override
            public boolean advance() throws IOException {
                if (this.recordsRead >= UnboundedToBoundedSourceAdapter.this.maxNumRecords) {
                    this.finalizeCheckpoint();
                    return false;
                }
                ++this.recordsRead;
                return this.advanceWithBackoff();
            }

            private boolean advanceWithBackoff() throws IOException {
                BackOff backoff = BACKOFF_FACTORY.backoff();
                long nextSleep = backoff.nextBackOffMillis();
                while (nextSleep != -1L) {
                    if (this.endTime != null && Instant.now().isAfter((ReadableInstant)this.endTime)) {
                        this.finalizeCheckpoint();
                        return false;
                    }
                    if (this.reader.advance()) {
                        return true;
                    }
                    Uninterruptibles.sleepUninterruptibly(nextSleep, TimeUnit.MILLISECONDS);
                    nextSleep = backoff.nextBackOffMillis();
                }
                this.finalizeCheckpoint();
                return false;
            }

            private void finalizeCheckpoint() throws IOException {
                this.reader.getCheckpointMark().finalizeCheckpoint();
            }

            @Override
            public ValueWithRecordId<T> getCurrent() throws NoSuchElementException {
                return new ValueWithRecordId(this.reader.getCurrent(), this.reader.getCurrentRecordId());
            }

            @Override
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return this.reader.getCurrentTimestamp();
            }

            @Override
            public void close() throws IOException {
                this.reader.close();
            }

            @Override
            public BoundedSource<ValueWithRecordId<T>> getCurrentSource() {
                return UnboundedToBoundedSourceAdapter.this;
            }
        }
    }
}

