/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms;

import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

public class WithKeys<K, V>
extends PTransform<PCollection<V>, PCollection<KV<K, V>>> {
    private SerializableFunction<V, K> fn;
    private transient Class<K> keyClass;

    public static <K, V> WithKeys<K, V> of(SerializableFunction<V, K> fn) {
        return new WithKeys<K, V>(fn, null);
    }

    public static <K, V> WithKeys<K, V> of(final K key) {
        return new WithKeys(new SerializableFunction<V, K>(){

            @Override
            public K apply(V value) {
                return key;
            }
        }, key == null ? null : key.getClass());
    }

    private WithKeys(SerializableFunction<V, K> fn, Class<K> keyClass) {
        this.fn = fn;
        this.keyClass = keyClass;
    }

    public WithKeys<K, V> withKeyType(TypeDescriptor<K> keyType) {
        Class<K> rawType = keyType.getRawType();
        return new WithKeys<K, V>(this.fn, rawType);
    }

    @Override
    public PCollection<KV<K, V>> apply(PCollection<V> in) {
        PCollection result = (PCollection)in.apply(ParDo.named("AddKeys").of(new DoFn<V, KV<K, V>>(){

            @Override
            public void processElement(DoFn.ProcessContext c) {
                c.output(KV.of(WithKeys.this.fn.apply(c.element()), c.element()));
            }
        }));
        try {
            CoderRegistry coderRegistry = in.getPipeline().getCoderRegistry();
            Coder<TypeDescriptor<K>> keyCoder = this.keyClass == null ? coderRegistry.getDefaultOutputCoder(this.fn, in.getCoder()) : coderRegistry.getDefaultCoder(TypeDescriptor.of(this.keyClass));
            result.setCoder(KvCoder.of(keyCoder, in.getCoder()));
        }
        catch (CannotProvideCoderException cannotProvideCoderException) {
            // empty catch block
        }
        return result;
    }
}

