/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.google.cloud.dataflow.sdk.coders.BigEndianLongCoder;
import com.google.cloud.dataflow.sdk.coders.ByteArrayCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.CustomCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.NullableCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.hash.Hashing;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterFirst;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.PubsubClient;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.joda.time.Duration;

public class PubsubUnboundedSink<T>
extends PTransform<PCollection<T>, PDone> {
    private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
    private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
    private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds((long)2L);
    @VisibleForTesting
    static final Coder<PubsubClient.OutgoingMessage> CODER = new OutgoingMessageCoder();
    private final PubsubClient.PubsubClientFactory pubsubFactory;
    private final PubsubClient.TopicPath topic;
    private final Coder<T> elementCoder;
    @Nullable
    private final String timestampLabel;
    @Nullable
    private final String idLabel;
    private final int numShards;
    private final int publishBatchSize;
    private final int publishBatchBytes;
    private final Duration maxLatency;
    private final RecordIdMethod recordIdMethod;

    @VisibleForTesting
    PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubFactory, PubsubClient.TopicPath topic, Coder<T> elementCoder, String timestampLabel, String idLabel, int numShards, int publishBatchSize, int publishBatchBytes, Duration maxLatency, RecordIdMethod recordIdMethod) {
        this.pubsubFactory = pubsubFactory;
        this.topic = topic;
        this.elementCoder = elementCoder;
        this.timestampLabel = timestampLabel;
        this.idLabel = idLabel;
        this.numShards = numShards;
        this.publishBatchSize = publishBatchSize;
        this.publishBatchBytes = publishBatchBytes;
        this.maxLatency = maxLatency;
        this.recordIdMethod = idLabel == null ? RecordIdMethod.NONE : recordIdMethod;
    }

    public PubsubUnboundedSink(PubsubClient.PubsubClientFactory pubsubFactory, PubsubClient.TopicPath topic, Coder<T> elementCoder, String timestampLabel, String idLabel, int numShards) {
        this(pubsubFactory, topic, elementCoder, timestampLabel, idLabel, numShards, 1000, 400000, DEFAULT_MAX_LATENCY, RecordIdMethod.RANDOM);
    }

    public PubsubClient.TopicPath getTopic() {
        return this.topic;
    }

    @Nullable
    public String getTimestampLabel() {
        return this.timestampLabel;
    }

    @Nullable
    public String getIdLabel() {
        return this.idLabel;
    }

    public Coder<T> getElementCoder() {
        return this.elementCoder;
    }

    @Override
    public PDone apply(PCollection<T> input) {
        ((PCollection)((PCollection)((PCollection)((PCollection)input.apply(Window.named("PubsubUnboundedSink.Window").into(new GlobalWindows()).triggering(Repeatedly.forever(AfterFirst.of(AfterPane.elementCountAtLeast(this.publishBatchSize), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(this.maxLatency)))).discardingFiredPanes())).apply(ParDo.named("PubsubUnboundedSink.Shard").of(new ShardFn<T>(this.elementCoder, this.numShards, this.recordIdMethod)))).setCoder(KvCoder.of(VarIntCoder.of(), CODER))).apply(GroupByKey.create())).apply(ParDo.named("PubsubUnboundedSink.Writer").of(new WriterFn(this.pubsubFactory, this.topic, this.timestampLabel, this.idLabel, this.publishBatchSize, this.publishBatchBytes)));
        return PDone.in(input.getPipeline());
    }

    private static class WriterFn
    extends DoFn<KV<Integer, Iterable<PubsubClient.OutgoingMessage>>, Void> {
        private final PubsubClient.PubsubClientFactory pubsubFactory;
        private final PubsubClient.TopicPath topic;
        private final String timestampLabel;
        private final String idLabel;
        private final int publishBatchSize;
        private final int publishBatchBytes;
        @Nullable
        private transient PubsubClient pubsubClient;
        private final Aggregator<Long, Long> batchCounter = this.createAggregator("batches", new Sum.SumLongFn());
        private final Aggregator<Long, Long> elementCounter = this.createAggregator("elements", new Sum.SumLongFn());
        private final Aggregator<Long, Long> byteCounter = this.createAggregator("bytes", new Sum.SumLongFn());

        WriterFn(PubsubClient.PubsubClientFactory pubsubFactory, PubsubClient.TopicPath topic, String timestampLabel, String idLabel, int publishBatchSize, int publishBatchBytes) {
            this.pubsubFactory = pubsubFactory;
            this.topic = topic;
            this.timestampLabel = timestampLabel;
            this.idLabel = idLabel;
            this.publishBatchSize = publishBatchSize;
            this.publishBatchBytes = publishBatchBytes;
        }

        private void publishBatch(List<PubsubClient.OutgoingMessage> messages, int bytes) throws IOException {
            int n = this.pubsubClient.publish(this.topic, messages);
            Preconditions.checkState(n == messages.size(), "Attempted to publish %s messages but %s were successful", messages.size(), n);
            this.batchCounter.addValue(1L);
            this.elementCounter.addValue(Long.valueOf(messages.size()));
            this.byteCounter.addValue(Long.valueOf(bytes));
        }

        @Override
        public void startBundle(DoFn.Context c) throws Exception {
            Preconditions.checkState(this.pubsubClient == null, "startBundle invoked without prior finishBundle");
            this.pubsubClient = this.pubsubFactory.newClient(this.timestampLabel, this.idLabel, c.getPipelineOptions().as(DataflowPipelineOptions.class));
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            ArrayList<PubsubClient.OutgoingMessage> pubsubMessages = new ArrayList<PubsubClient.OutgoingMessage>(this.publishBatchSize);
            int bytes = 0;
            for (PubsubClient.OutgoingMessage message : (Iterable)((KV)c.element()).getValue()) {
                if (!pubsubMessages.isEmpty() && bytes + message.elementBytes.length > this.publishBatchBytes) {
                    this.publishBatch(pubsubMessages, bytes);
                    pubsubMessages.clear();
                    bytes = 0;
                }
                pubsubMessages.add(message);
                bytes += message.elementBytes.length;
            }
            if (!pubsubMessages.isEmpty()) {
                this.publishBatch(pubsubMessages, bytes);
            }
        }

        @Override
        public void finishBundle(DoFn.Context c) throws Exception {
            this.pubsubClient.close();
            this.pubsubClient = null;
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("topic", this.topic.getPath()));
            builder.add(DisplayData.item("transport", this.pubsubFactory.getKind()));
            builder.addIfNotNull(DisplayData.item("timestampLabel", this.timestampLabel));
            builder.addIfNotNull(DisplayData.item("idLabel", this.idLabel));
        }
    }

    private static class ShardFn<T>
    extends DoFn<T, KV<Integer, PubsubClient.OutgoingMessage>> {
        private final Aggregator<Long, Long> elementCounter = this.createAggregator("elements", new Sum.SumLongFn());
        private final Coder<T> elementCoder;
        private final int numShards;
        private final RecordIdMethod recordIdMethod;

        ShardFn(Coder<T> elementCoder, int numShards, RecordIdMethod recordIdMethod) {
            this.elementCoder = elementCoder;
            this.numShards = numShards;
            this.recordIdMethod = recordIdMethod;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) throws Exception {
            this.elementCounter.addValue(1L);
            byte[] elementBytes = CoderUtils.encodeToByteArray(this.elementCoder, c.element());
            long timestampMsSinceEpoch = c.timestamp().getMillis();
            String recordId = null;
            switch (this.recordIdMethod) {
                case NONE: {
                    break;
                }
                case DETERMINISTIC: {
                    recordId = Hashing.murmur3_128().hashBytes(elementBytes).toString();
                    break;
                }
                case RANDOM: {
                    recordId = UUID.randomUUID().toString();
                }
            }
            c.output(KV.of(ThreadLocalRandom.current().nextInt(this.numShards), new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId)));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("numShards", this.numShards));
        }
    }

    @VisibleForTesting
    static enum RecordIdMethod {
        NONE,
        RANDOM,
        DETERMINISTIC;

    }

    private static class OutgoingMessageCoder
    extends CustomCoder<PubsubClient.OutgoingMessage> {
        private static final NullableCoder<String> RECORD_ID_CODER = NullableCoder.of(StringUtf8Coder.of());

        private OutgoingMessageCoder() {
        }

        @Override
        public void encode(PubsubClient.OutgoingMessage value, OutputStream outStream, Coder.Context context) throws CoderException, IOException {
            ByteArrayCoder.of().encode(value.elementBytes, outStream, Coder.Context.NESTED);
            BigEndianLongCoder.of().encode(value.timestampMsSinceEpoch, outStream, Coder.Context.NESTED);
            RECORD_ID_CODER.encode(value.recordId, outStream, Coder.Context.NESTED);
        }

        @Override
        public PubsubClient.OutgoingMessage decode(InputStream inStream, Coder.Context context) throws CoderException, IOException {
            byte[] elementBytes = ByteArrayCoder.of().decode(inStream, Coder.Context.NESTED);
            long timestampMsSinceEpoch = BigEndianLongCoder.of().decode(inStream, Coder.Context.NESTED);
            String recordId = RECORD_ID_CODER.decode(inStream, Coder.Context.NESTED);
            return new PubsubClient.OutgoingMessage(elementBytes, timestampMsSinceEpoch, recordId);
        }
    }
}

