/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.transforms.join;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult;
import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
import com.google.cloud.dataflow.sdk.transforms.join.UnionCoder;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import java.util.ArrayList;

public class CoGroupByKey<K>
extends PTransform<KeyedPCollectionTuple<K>, PCollection<KV<K, CoGbkResult>>> {
    public static <K> CoGroupByKey<K> create() {
        return new CoGroupByKey<K>();
    }

    private CoGroupByKey() {
    }

    @Override
    public PCollection<KV<K, CoGbkResult>> apply(KeyedPCollectionTuple<K> input) {
        if (input.isEmpty()) {
            throw new IllegalArgumentException("must have at least one input to a KeyedPCollections");
        }
        ArrayList codersList = new ArrayList();
        for (KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> entry : input.getKeyedCollections()) {
            codersList.add(this.getValueCoder(entry.pCollection));
        }
        UnionCoder unionCoder = UnionCoder.of(codersList);
        Coder<K> keyCoder = input.getKeyCoder();
        KvCoder<K, RawUnionValue> kVCoder = KvCoder.of(keyCoder, unionCoder);
        PCollectionList unionTables = PCollectionList.empty(input.getPipeline());
        int index = -1;
        for (KeyedPCollectionTuple.TaggedKeyedPCollection<K, ?> entry : input.getKeyedCollections()) {
            PCollection unionTable = this.makeUnionTable(++index, entry.pCollection, kVCoder);
            unionTables = unionTables.and(unionTable);
        }
        PCollection flattenedTable = (PCollection)unionTables.apply(Flatten.pCollections());
        PCollection groupedTable = (PCollection)flattenedTable.apply(GroupByKey.create());
        CoGbkResultSchema tupleTags = input.getCoGbkResultSchema();
        PCollection result = (PCollection)groupedTable.apply(ParDo.of(new ConstructCoGbkResultFn(tupleTags)).named("ConstructCoGbkResultFn"));
        result.setCoder(KvCoder.of(keyCoder, CoGbkResult.CoGbkResultCoder.of(tupleTags, unionCoder)));
        return result;
    }

    private <V> Coder<V> getValueCoder(PCollection<KV<K, V>> pCollection) {
        Coder<KV<K, V>> entryCoder = pCollection.getCoder();
        if (!(entryCoder instanceof KvCoder)) {
            throw new IllegalArgumentException("PCollection does not use a KvCoder");
        }
        KvCoder coder = (KvCoder)entryCoder;
        return coder.getValueCoder();
    }

    private <V> PCollection<KV<K, RawUnionValue>> makeUnionTable(int index, PCollection<KV<K, V>> pCollection, KvCoder<K, RawUnionValue> unionTableEncoder) {
        return ((PCollection)pCollection.apply(ParDo.of(new ConstructUnionTableFn(index)).named("MakeUnionTable" + index))).setCoder(unionTableEncoder);
    }

    private static class ConstructCoGbkResultFn<K>
    extends DoFn<KV<K, Iterable<RawUnionValue>>, KV<K, CoGbkResult>> {
        private final CoGbkResultSchema schema;

        public ConstructCoGbkResultFn(CoGbkResultSchema schema) {
            this.schema = schema;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV e = (KV)c.element();
            c.output(KV.of(e.getKey(), new CoGbkResult(this.schema, (Iterable)e.getValue())));
        }
    }

    private static class ConstructUnionTableFn<K, V>
    extends DoFn<KV<K, V>, KV<K, RawUnionValue>> {
        private final int index;

        public ConstructUnionTableFn(int index) {
            this.index = index;
        }

        @Override
        public void processElement(DoFn.ProcessContext c) {
            KV e = (KV)c.element();
            c.output(KV.of(e.getKey(), new RawUnionValue(this.index, e.getValue())));
        }
    }
}

