/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.dataflow.sdk.io;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobConfigurationTableCopy;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.io.AvroSource;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.annotations.VisibleForTesting;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Function;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.MoreObjects;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Strings;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.ImmutableList;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.collect.Lists;
import com.google.cloud.dataflow.sdk.repackaged.com.google.common.io.CountingOutputStream;
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.View;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.util.AvroUtils;
import com.google.cloud.dataflow.sdk.util.BigQueryServices;
import com.google.cloud.dataflow.sdk.util.BigQueryServicesImpl;
import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter;
import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator;
import com.google.cloud.dataflow.sdk.util.FileIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.GcsIOChannelFactory;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.IOChannelFactory;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.Reshuffle;
import com.google.cloud.dataflow.sdk.util.SystemDoFnInternal;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionTuple;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BigQueryIO {
    private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
    private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
    private static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]{4,61}[a-z0-9]";
    private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";
    private static final String TABLE_REGEXP = "[-\\w$@]{1,1024}";
    private static final String DATASET_TABLE_REGEXP = String.format("((?<PROJECT>%s):)?(?<DATASET>%s)\\.(?<TABLE>%s)", "[a-z][-a-z0-9:.]{4,61}[a-z0-9]", "[-\\w.]{1,1024}", "[-\\w$@]{1,1024}");
    private static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
    @Deprecated
    public static final String SET_PROJECT_FROM_OPTIONS_WARNING = "No project specified for BigQuery table \"%1$s.%2$s\". Assuming it is in \"%3$s\". If the table is in a different project please specify it as a part of the BigQuery table definition.";
    private static final String RESOURCE_NOT_FOUND_ERROR = "BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline execution. If the %1$s is created by an earlier stage of the pipeline, this validation can be disabled using #withoutValidation.";
    private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR = "Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by an earlier stage of the pipeline, this validation can be disabled using #withoutValidation.";

    public static TableReference parseTableSpec(String tableSpec) {
        Matcher match = TABLE_SPEC.matcher(tableSpec);
        if (!match.matches()) {
            throw new IllegalArgumentException("Table reference is not in [project_id]:[dataset_id].[table_id] format: " + tableSpec);
        }
        TableReference ref = new TableReference();
        ref.setProjectId(match.group("PROJECT"));
        return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
    }

    public static String toTableSpec(TableReference ref) {
        StringBuilder sb = new StringBuilder();
        if (ref.getProjectId() != null) {
            sb.append(ref.getProjectId());
            sb.append(":");
        }
        sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
        return sb.toString();
    }

    private static String getExtractJobId(String jobIdToken) {
        return jobIdToken + "-extract";
    }

    private static String getExtractDestinationUri(String extractDestinationDir) {
        return String.format("%s/%s", extractDestinationDir, "*.avro");
    }

    private static List<String> getExtractFilePaths(String extractDestinationDir, Job extractJob) throws IOException {
        JobStatistics jobStats = extractJob.getStatistics();
        List counts = jobStats.getExtract().getDestinationUriFileCounts();
        if (counts.size() != 1) {
            String errorMessage = counts.size() == 0 ? "No destination uri file count received." : String.format("More than one destination uri file count received. First two are %s, %s", counts.get(0), counts.get(1));
            throw new RuntimeException(errorMessage);
        }
        long filesCount = (Long)counts.get(0);
        ImmutableList.Builder paths = ImmutableList.builder();
        IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
        for (long i = 0L; i < filesCount; ++i) {
            String filePath = factory.resolve(extractDestinationDir, String.format("%012d%s", i, ".avro"));
            paths.add(filePath);
        }
        return paths.build();
    }

    private static void verifyDatasetPresence(BigQueryOptions options, TableReference table) {
        String resourceNotFoundMsg = String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", BigQueryIO.toTableSpec(table));
        try {
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableRowIterator.executeWithBackOff(client.datasets().get(table.getProjectId(), table.getDatasetId()), resourceNotFoundMsg);
        }
        catch (Exception e) {
            ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
            if (e instanceof IOException && errorExtractor.itemNotFound((IOException)e)) {
                throw new IllegalArgumentException(resourceNotFoundMsg, e);
            }
            throw new RuntimeException(String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", BigQueryIO.toTableSpec(table)), e);
        }
    }

    private static void verifyTablePresence(BigQueryOptions options, TableReference table) {
        String resourceNotFoundMsg = String.format(RESOURCE_NOT_FOUND_ERROR, "table", BigQueryIO.toTableSpec(table));
        try {
            Bigquery client = Transport.newBigQueryClient(options).build();
            BigQueryTableRowIterator.executeWithBackOff(client.tables().get(table.getProjectId(), table.getDatasetId(), table.getTableId()), resourceNotFoundMsg);
        }
        catch (Exception e) {
            ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
            if (e instanceof IOException && errorExtractor.itemNotFound((IOException)e)) {
                throw new IllegalArgumentException(resourceNotFoundMsg, e);
            }
            throw new RuntimeException(String.format(UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", BigQueryIO.toTableSpec(table)), e);
        }
    }

    private static Status parseStatus(@Nullable Job job) {
        if (job == null) {
            return Status.UNKNOWN;
        }
        JobStatus status = job.getStatus();
        if (status.getErrorResult() != null) {
            return Status.FAILED;
        }
        if (status.getErrors() != null && !status.getErrors().isEmpty()) {
            return Status.FAILED;
        }
        return Status.SUCCEEDED;
    }

    @VisibleForTesting
    static String toJsonString(Object item) {
        if (item == null) {
            return null;
        }
        try {
            return JSON_FACTORY.toString(item);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()), e);
        }
    }

    @VisibleForTesting
    static <T> T fromJsonString(String json, Class<T> clazz) {
        if (json == null) {
            return null;
        }
        try {
            return (T)JSON_FACTORY.fromString(json, clazz);
        }
        catch (IOException e) {
            throw new RuntimeException(String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), e);
        }
    }

    private static String randomUUIDString() {
        return UUID.randomUUID().toString().replaceAll("-", "");
    }

    private BigQueryIO() {
    }

    private static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
        List<V> value = map.get(key);
        if (value == null) {
            value = new ArrayList<V>();
            map.put(key, value);
        }
        return value;
    }

    static enum Status {
        SUCCEEDED,
        FAILED,
        UNKNOWN;

    }

    private static class StreamWithDeDup
    extends PTransform<PCollection<TableRow>, PDone> {
        private final transient TableReference tableReference;
        private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
        private final transient TableSchema tableSchema;

        StreamWithDeDup(TableReference tableReference, SerializableFunction<BoundedWindow, TableReference> tableRefFunction, TableSchema tableSchema) {
            this.tableReference = tableReference;
            this.tableRefFunction = tableRefFunction;
            this.tableSchema = tableSchema;
        }

        @Override
        protected Coder<Void> getDefaultOutputCoder() {
            return VoidCoder.of();
        }

        @Override
        public PDone apply(PCollection<TableRow> input) {
            PCollection tagged = (PCollection)((Object)input.apply(ParDo.of(new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class), this.tableReference, this.tableRefFunction))));
            ((PCollection)((PCollection)tagged.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), TableRowInfoCoder.of()))).apply(Reshuffle.of())).apply(ParDo.of(new StreamingWriteFn(this.tableSchema)));
            return PDone.in(input.getPipeline());
        }
    }

    private static class TagWithUniqueIdsAndTable
    extends DoFn<TableRow, KV<ShardedKey<String>, TableRowInfo>>
    implements DoFn.RequiresWindowAccess {
        private final String tableSpec;
        private final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
        private transient String randomUUID;
        private transient long sequenceNo = 0L;

        TagWithUniqueIdsAndTable(BigQueryOptions options, TableReference table, SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
            Preconditions.checkArgument(table == null ^ tableRefFunction == null, "Exactly one of table or tableRefFunction should be set");
            if (table != null) {
                if (table.getProjectId() == null) {
                    table.setProjectId(options.as(BigQueryOptions.class).getProject());
                }
                this.tableSpec = BigQueryIO.toTableSpec(table);
            } else {
                this.tableSpec = null;
            }
            this.tableRefFunction = tableRefFunction;
        }

        @Override
        public void startBundle(DoFn.Context context) {
            this.randomUUID = UUID.randomUUID().toString();
        }

        @Override
        public void processElement(DoFn.ProcessContext context) throws IOException {
            String uniqueId = this.randomUUID + this.sequenceNo++;
            ThreadLocalRandom randomGenerator = ThreadLocalRandom.current();
            String tableSpec = this.tableSpecFromWindow(context.getPipelineOptions().as(BigQueryOptions.class), context.window());
            context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), new TableRowInfo((TableRow)context.element(), uniqueId)));
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("table", this.tableSpec));
            if (this.tableRefFunction != null) {
                builder.add(DisplayData.item("tableFn", this.tableRefFunction.getClass()).withLabel("Table Reference Function"));
            }
        }

        private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) {
            if (this.tableSpec != null) {
                return this.tableSpec;
            }
            TableReference table = this.tableRefFunction.apply(window);
            if (table.getProjectId() == null) {
                table.setProjectId(options.getProject());
            }
            return BigQueryIO.toTableSpec(table);
        }
    }

    private static class TableRowInfo {
        final TableRow tableRow;
        final String uniqueId;

        TableRowInfo(TableRow tableRow, String uniqueId) {
            this.tableRow = tableRow;
            this.uniqueId = uniqueId;
        }
    }

    private static class TableRowInfoCoder
    extends AtomicCoder<TableRowInfo> {
        private static final TableRowInfoCoder INSTANCE = new TableRowInfoCoder();
        TableRowJsonCoder tableRowCoder = TableRowJsonCoder.of();
        StringUtf8Coder idCoder = StringUtf8Coder.of();

        private TableRowInfoCoder() {
        }

        @JsonCreator
        public static TableRowInfoCoder of() {
            return INSTANCE;
        }

        @Override
        public void encode(TableRowInfo value, OutputStream outStream, Coder.Context context) throws IOException {
            if (value == null) {
                throw new CoderException("cannot encode a null value");
            }
            this.tableRowCoder.encode(value.tableRow, outStream, context.nested());
            this.idCoder.encode(value.uniqueId, outStream, context.nested());
        }

        @Override
        public TableRowInfo decode(InputStream inStream, Coder.Context context) throws IOException {
            return new TableRowInfo(this.tableRowCoder.decode(inStream, context.nested()), this.idCoder.decode(inStream, context.nested()));
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            throw new Coder.NonDeterministicException(this, "TableRows are not deterministic.");
        }
    }

    private static class ShardedKeyCoder<KeyT>
    extends StandardCoder<ShardedKey<KeyT>> {
        Coder<KeyT> keyCoder;
        VarIntCoder shardNumberCoder;

        public static <KeyT> ShardedKeyCoder<KeyT> of(Coder<KeyT> keyCoder) {
            return new ShardedKeyCoder<KeyT>(keyCoder);
        }

        @JsonCreator
        public static <KeyT> ShardedKeyCoder<KeyT> of(@JsonProperty(value="component_encodings") List<Coder<KeyT>> components) {
            Preconditions.checkArgument(components.size() == 1, "Expecting 1 component, got %s", components.size());
            return ShardedKeyCoder.of(components.get(0));
        }

        protected ShardedKeyCoder(Coder<KeyT> keyCoder) {
            this.keyCoder = keyCoder;
            this.shardNumberCoder = VarIntCoder.of();
        }

        @Override
        public List<? extends Coder<?>> getCoderArguments() {
            return Arrays.asList(this.keyCoder);
        }

        @Override
        public void encode(ShardedKey<KeyT> key, OutputStream outStream, Coder.Context context) throws IOException {
            this.keyCoder.encode(key.getKey(), outStream, context.nested());
            this.shardNumberCoder.encode(key.getShardNumber(), outStream, context);
        }

        @Override
        public ShardedKey<KeyT> decode(InputStream inStream, Coder.Context context) throws IOException {
            return new ShardedKey(this.keyCoder.decode(inStream, context.nested()), this.shardNumberCoder.decode(inStream, context));
        }

        @Override
        public void verifyDeterministic() throws Coder.NonDeterministicException {
            this.keyCoder.verifyDeterministic();
        }
    }

    private static class ShardedKey<K> {
        private final K key;
        private final int shardNumber;

        public static <K> ShardedKey<K> of(K key, int shardNumber) {
            return new ShardedKey<K>(key, shardNumber);
        }

        private ShardedKey(K key, int shardNumber) {
            this.key = key;
            this.shardNumber = shardNumber;
        }

        public K getKey() {
            return this.key;
        }

        public int getShardNumber() {
            return this.shardNumber;
        }
    }

    @SystemDoFnInternal
    private static class StreamingWriteFn
    extends DoFn<KV<ShardedKey<String>, TableRowInfo>, Void> {
        private final String jsonTableSchema;
        private transient Map<String, List<TableRow>> tableRows;
        private transient Map<String, List<String>> uniqueIdsForTableRows;
        private static Set<String> createdTables = Collections.newSetFromMap(new ConcurrentHashMap());
        private Aggregator<Long, Long> byteCountAggregator = this.createAggregator("ByteCount", new Sum.SumLongFn());

        StreamingWriteFn(TableSchema schema) {
            this.jsonTableSchema = BigQueryIO.toJsonString(schema);
        }

        @Override
        public void startBundle(DoFn.Context context) {
            this.tableRows = new HashMap<String, List<TableRow>>();
            this.uniqueIdsForTableRows = new HashMap<String, List<String>>();
        }

        @Override
        public void processElement(DoFn.ProcessContext context) {
            String tableSpec = (String)((ShardedKey)((KV)context.element()).getKey()).getKey();
            List rows = BigQueryIO.getOrCreateMapListValue(this.tableRows, tableSpec);
            List uniqueIds = BigQueryIO.getOrCreateMapListValue(this.uniqueIdsForTableRows, tableSpec);
            rows.add(((TableRowInfo)((KV)context.element()).getValue()).tableRow);
            uniqueIds.add(((TableRowInfo)((KV)context.element()).getValue()).uniqueId);
        }

        @Override
        public void finishBundle(DoFn.Context context) throws Exception {
            BigQueryOptions options = context.getPipelineOptions().as(BigQueryOptions.class);
            Bigquery client = Transport.newBigQueryClient(options).build();
            for (String tableSpec : this.tableRows.keySet()) {
                TableReference tableReference = this.getOrCreateTable(options, tableSpec);
                this.flushRows(client, tableReference, this.tableRows.get(tableSpec), this.uniqueIdsForTableRows.get(tableSpec));
            }
            this.tableRows.clear();
            this.uniqueIdsForTableRows.clear();
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.addIfNotNull(DisplayData.item("schema", this.jsonTableSchema).withLabel("Table Schema"));
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public TableReference getOrCreateTable(BigQueryOptions options, String tableSpec) throws IOException {
            TableReference tableReference = BigQueryIO.parseTableSpec(tableSpec);
            if (!createdTables.contains(tableSpec)) {
                Set<String> set = createdTables;
                synchronized (set) {
                    if (!createdTables.contains(tableSpec)) {
                        TableSchema tableSchema = (TableSchema)JSON_FACTORY.fromString(this.jsonTableSchema, TableSchema.class);
                        Bigquery client = Transport.newBigQueryClient(options).build();
                        BigQueryTableInserter inserter = new BigQueryTableInserter(client);
                        inserter.getOrCreateTable(tableReference, Write.WriteDisposition.WRITE_APPEND, Write.CreateDisposition.CREATE_IF_NEEDED, tableSchema);
                        createdTables.add(tableSpec);
                    }
                }
            }
            return tableReference;
        }

        private void flushRows(Bigquery client, TableReference tableReference, List<TableRow> tableRows, List<String> uniqueIds) {
            if (!tableRows.isEmpty()) {
                try {
                    BigQueryTableInserter inserter = new BigQueryTableInserter(client);
                    inserter.insertAll(tableReference, tableRows, uniqueIds, this.byteCountAggregator);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static class Write {
        public static Bound named(String name) {
            return new Bound().named(name);
        }

        public static Bound to(String tableSpec) {
            return new Bound().to(tableSpec);
        }

        public static Bound to(TableReference table) {
            return new Bound().to(table);
        }

        public static Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
            return new Bound().to(tableSpecFunction);
        }

        public static Bound toTableReference(SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
            return new Bound().toTableReference(tableRefFunction);
        }

        public static Bound withSchema(TableSchema schema) {
            return new Bound().withSchema(schema);
        }

        public static Bound withCreateDisposition(CreateDisposition disposition) {
            return new Bound().withCreateDisposition(disposition);
        }

        public static Bound withWriteDisposition(WriteDisposition disposition) {
            return new Bound().withWriteDisposition(disposition);
        }

        public static Bound withoutValidation() {
            return new Bound().withoutValidation();
        }

        private Write() {
        }

        static class WriteRename
        extends DoFn<String, Void> {
            private final BigQueryServices bqServices;
            private final String jobIdToken;
            private final String jsonTableRef;
            private final WriteDisposition writeDisposition;
            private final CreateDisposition createDisposition;
            private final PCollectionView<Iterable<String>> tempTablesView;

            public WriteRename(BigQueryServices bqServices, String jobIdToken, String jsonTableRef, WriteDisposition writeDisposition, CreateDisposition createDisposition, PCollectionView<Iterable<String>> tempTablesView) {
                this.bqServices = bqServices;
                this.jobIdToken = jobIdToken;
                this.jsonTableRef = jsonTableRef;
                this.writeDisposition = writeDisposition;
                this.createDisposition = createDisposition;
                this.tempTablesView = tempTablesView;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                ArrayList<String> tempTablesJson = Lists.newArrayList(c.sideInput(this.tempTablesView));
                if (tempTablesJson.size() == 0) {
                    return;
                }
                ArrayList<TableReference> tempTables = Lists.newArrayList();
                for (String table : tempTablesJson) {
                    tempTables.add(BigQueryIO.fromJsonString(table, TableReference.class));
                }
                this.copy(this.bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), this.jobIdToken, BigQueryIO.fromJsonString(this.jsonTableRef, TableReference.class), tempTables, this.writeDisposition, this.createDisposition);
                BigQueryServices.DatasetService tableService = this.bqServices.getDatasetService(c.getPipelineOptions().as(BigQueryOptions.class));
                WriteRename.removeTemporaryTables(tableService, tempTables);
            }

            private void copy(BigQueryServices.JobService jobService, String jobIdPrefix, TableReference ref, List<TableReference> tempTables, WriteDisposition writeDisposition, CreateDisposition createDisposition) throws InterruptedException, IOException {
                JobConfigurationTableCopy copyConfig = new JobConfigurationTableCopy().setSourceTables(tempTables).setDestinationTable(ref).setWriteDisposition(writeDisposition.name()).setCreateDisposition(createDisposition.name());
                String projectId = ref.getProjectId();
                block5: for (int i = 0; i < 3; ++i) {
                    String jobId = jobIdPrefix + "-" + i;
                    LOG.info("Starting BigQuery copy job {}: try {}/{}", new Object[]{jobId, i, 3});
                    JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
                    jobService.startCopyJob(jobRef, copyConfig);
                    Status jobStatus = BigQueryIO.parseStatus(jobService.pollJob(jobRef, Integer.MAX_VALUE));
                    switch (jobStatus) {
                        case SUCCEEDED: {
                            return;
                        }
                        case UNKNOWN: {
                            throw new RuntimeException("Failed to poll the copy job status of job " + jobId);
                        }
                        case FAILED: {
                            LOG.info("BigQuery copy job failed: {}", (Object)jobId);
                            continue block5;
                        }
                        default: {
                            throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", new Object[]{jobStatus, jobId}));
                        }
                    }
                }
                throw new RuntimeException(String.format("Failed to create the copy job %s, reached max retries: %d", jobIdPrefix, 3));
            }

            static void removeTemporaryTables(BigQueryServices.DatasetService tableService, List<TableReference> tempTables) throws Exception {
                for (TableReference tableRef : tempTables) {
                    try {
                        LOG.debug("Deleting table {}", (Object)BigQueryIO.toJsonString(tableRef));
                        tableService.deleteTable(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId());
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to delete the table {}", (Object)BigQueryIO.toJsonString(tableRef), (Object)e);
                    }
                }
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.addIfNotNull(DisplayData.item("jobIdToken", this.jobIdToken).withLabel("Job ID Token")).addIfNotNull(DisplayData.item("jsonTableRef", this.jsonTableRef).withLabel("Table Reference")).add(DisplayData.item("writeDisposition", this.writeDisposition.toString()).withLabel("Write Disposition")).add(DisplayData.item("createDisposition", this.createDisposition.toString()).withLabel("Create Disposition"));
            }
        }

        static class WriteTables
        extends DoFn<KV<Long, Iterable<List<String>>>, String> {
            private final boolean singlePartition;
            private final BigQueryServices bqServices;
            private final String jobIdToken;
            private final String tempFilePrefix;
            private final String jsonTableRef;
            private final String jsonSchema;
            private final WriteDisposition writeDisposition;
            private final CreateDisposition createDisposition;

            public WriteTables(boolean singlePartition, BigQueryServices bqServices, String jobIdToken, String tempFilePrefix, String jsonTableRef, String jsonSchema, WriteDisposition writeDisposition, CreateDisposition createDisposition) {
                this.singlePartition = singlePartition;
                this.bqServices = bqServices;
                this.jobIdToken = jobIdToken;
                this.tempFilePrefix = tempFilePrefix;
                this.jsonTableRef = jsonTableRef;
                this.jsonSchema = jsonSchema;
                this.writeDisposition = writeDisposition;
                this.createDisposition = createDisposition;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                List partition = (List)Lists.newArrayList((Iterable)((KV)c.element()).getValue()).get(0);
                String jobIdPrefix = String.format(this.jobIdToken + "_%05d", ((KV)c.element()).getKey());
                TableReference ref = BigQueryIO.fromJsonString(this.jsonTableRef, TableReference.class);
                if (!this.singlePartition) {
                    ref.setTableId(jobIdPrefix);
                }
                this.load(this.bqServices.getJobService(c.getPipelineOptions().as(BigQueryOptions.class)), jobIdPrefix, ref, BigQueryIO.fromJsonString(this.jsonSchema, TableSchema.class), partition, this.writeDisposition, this.createDisposition);
                c.output(BigQueryIO.toJsonString(ref));
                WriteTables.removeTemporaryFiles(c.getPipelineOptions(), this.tempFilePrefix, partition);
            }

            private void load(BigQueryServices.JobService jobService, String jobIdPrefix, TableReference ref, @Nullable TableSchema schema, List<String> gcsUris, WriteDisposition writeDisposition, CreateDisposition createDisposition) throws InterruptedException, IOException {
                JobConfigurationLoad loadConfig = new JobConfigurationLoad().setDestinationTable(ref).setSchema(schema).setSourceUris(gcsUris).setWriteDisposition(writeDisposition.name()).setCreateDisposition(createDisposition.name()).setSourceFormat("NEWLINE_DELIMITED_JSON");
                String projectId = ref.getProjectId();
                block5: for (int i = 0; i < 3; ++i) {
                    String jobId = jobIdPrefix + "-" + i;
                    LOG.info("Starting BigQuery load job {}: try {}/{}", new Object[]{jobId, i, 3});
                    JobReference jobRef = new JobReference().setProjectId(projectId).setJobId(jobId);
                    jobService.startLoadJob(jobRef, loadConfig);
                    Status jobStatus = BigQueryIO.parseStatus(jobService.pollJob(jobRef, Integer.MAX_VALUE));
                    switch (jobStatus) {
                        case SUCCEEDED: {
                            return;
                        }
                        case UNKNOWN: {
                            throw new RuntimeException("Failed to poll the load job status of job " + jobId);
                        }
                        case FAILED: {
                            LOG.info("BigQuery load job failed: {}", (Object)jobId);
                            continue block5;
                        }
                        default: {
                            throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", new Object[]{jobStatus, jobId}));
                        }
                    }
                }
                throw new RuntimeException(String.format("Failed to create the load job %s, reached max retries: %d", jobIdPrefix, 3));
            }

            static void removeTemporaryFiles(PipelineOptions options, String tempFilePrefix, Collection<String> files) throws IOException {
                IOChannelFactory factory = IOChannelUtils.getFactory(tempFilePrefix);
                if (factory instanceof GcsIOChannelFactory) {
                    GcsUtil gcsUtil = new GcsUtil.GcsUtilFactory().create(options);
                    gcsUtil.remove(files);
                } else if (factory instanceof FileIOChannelFactory) {
                    for (String filename : files) {
                        LOG.debug("Removing file {}", (Object)filename);
                        boolean exists = Files.deleteIfExists(Paths.get(filename, new String[0]));
                        if (exists) continue;
                        LOG.debug("{} does not exist.", (Object)filename);
                    }
                } else {
                    throw new IOException("Unrecognized file system.");
                }
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.addIfNotNull(DisplayData.item("jobIdToken", this.jobIdToken).withLabel("Job ID Token")).addIfNotNull(DisplayData.item("tempFilePrefix", this.tempFilePrefix).withLabel("Temporary File Prefix")).addIfNotNull(DisplayData.item("jsonTableRef", this.jsonTableRef).withLabel("Table Reference")).addIfNotNull(DisplayData.item("jsonSchema", this.jsonSchema).withLabel("Table Schema"));
            }
        }

        static class WritePartition
        extends DoFn<String, KV<Long, List<String>>> {
            private final PCollectionView<Iterable<KV<String, Long>>> resultsView;
            private TupleTag<KV<Long, List<String>>> multiPartitionsTag;
            private TupleTag<KV<Long, List<String>>> singlePartitionTag;

            public WritePartition(PCollectionView<Iterable<KV<String, Long>>> resultsView, TupleTag<KV<Long, List<String>>> multiPartitionsTag, TupleTag<KV<Long, List<String>>> singlePartitionTag) {
                this.resultsView = resultsView;
                this.multiPartitionsTag = multiPartitionsTag;
                this.singlePartitionTag = singlePartitionTag;
            }

            @Override
            public void processElement(DoFn.ProcessContext c) throws Exception {
                ArrayList<KV<String, Long>> results = Lists.newArrayList(c.sideInput(this.resultsView));
                if (results.isEmpty()) {
                    TableRowWriter writer = new TableRowWriter((String)c.element());
                    writer.open(UUID.randomUUID().toString());
                    results.add(writer.close());
                }
                long partitionId = 0L;
                int currNumFiles = 0;
                long currSizeBytes = 0L;
                ArrayList currResults = Lists.newArrayList();
                for (int i = 0; i < results.size(); ++i) {
                    KV fileResult = (KV)results.get(i);
                    if (currNumFiles + 1 > 10000 || currSizeBytes + (Long)fileResult.getValue() > 0xB0000000000L) {
                        c.sideOutput(this.multiPartitionsTag, KV.of(++partitionId, currResults));
                        currResults = Lists.newArrayList();
                        currNumFiles = 0;
                        currSizeBytes = 0L;
                    }
                    ++currNumFiles;
                    currSizeBytes += ((Long)fileResult.getValue()).longValue();
                    currResults.add(fileResult.getKey());
                }
                if (partitionId == 0L) {
                    c.sideOutput(this.singlePartitionTag, KV.of(++partitionId, currResults));
                } else {
                    c.sideOutput(this.multiPartitionsTag, KV.of(++partitionId, currResults));
                }
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
            }
        }

        static class TableRowWriter {
            private static final Coder<TableRow> CODER = TableRowJsonCoder.of();
            private static final byte[] NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
            private final String tempFilePrefix;
            private String id;
            private String fileName;
            private WritableByteChannel channel;
            protected String mimeType = "text/plain";
            private CountingOutputStream out;

            TableRowWriter(String basename) {
                this.tempFilePrefix = basename;
            }

            public final void open(String uId) throws Exception {
                this.id = uId;
                this.fileName = this.tempFilePrefix + this.id;
                LOG.debug("Opening {}.", (Object)this.fileName);
                this.channel = IOChannelUtils.create(this.fileName, this.mimeType);
                try {
                    this.out = new CountingOutputStream(Channels.newOutputStream(this.channel));
                    LOG.debug("Writing header to {}.", (Object)this.fileName);
                }
                catch (Exception e) {
                    try {
                        LOG.error("Writing header to {} failed, closing channel.", (Object)this.fileName);
                        this.channel.close();
                    }
                    catch (IOException closeException) {
                        LOG.error("Closing channel for {} failed", (Object)this.fileName);
                    }
                    throw e;
                }
                LOG.debug("Starting write of bundle {} to {}.", (Object)this.id, (Object)this.fileName);
            }

            public void write(TableRow value) throws Exception {
                CODER.encode(value, this.out, Coder.Context.OUTER);
                this.out.write(NEWLINE);
            }

            public final KV<String, Long> close() throws IOException {
                this.channel.close();
                return KV.of(this.fileName, this.out.getCount());
            }
        }

        public static class Bound
        extends PTransform<PCollection<TableRow>, PDone> {
            static final int MAX_NUM_FILES = 10000;
            static final long MAX_SIZE_BYTES = 0xB0000000000L;
            static final int MAX_RETRY_JOBS = 3;
            static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
            @Nullable
            final String jsonTableRef;
            @Nullable
            final SerializableFunction<BoundedWindow, TableReference> tableRefFunction;
            @Nullable
            final String jsonSchema;
            final CreateDisposition createDisposition;
            final WriteDisposition writeDisposition;
            final boolean validate;
            @Nullable
            private BigQueryServices testBigQueryServices;

            @Deprecated
            public Bound() {
                this(null, null, null, null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, true, null);
            }

            private Bound(String name, @Nullable String jsonTableRef, @Nullable SerializableFunction<BoundedWindow, TableReference> tableRefFunction, @Nullable String jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, boolean validate, @Nullable BigQueryServices testBigQueryServices) {
                super(name);
                this.jsonTableRef = jsonTableRef;
                this.tableRefFunction = tableRefFunction;
                this.jsonSchema = jsonSchema;
                this.createDisposition = Preconditions.checkNotNull(createDisposition, "createDisposition");
                this.writeDisposition = Preconditions.checkNotNull(writeDisposition, "writeDisposition");
                this.validate = validate;
                this.testBigQueryServices = testBigQueryServices;
            }

            public Bound named(String name) {
                return new Bound(name, this.jsonTableRef, this.tableRefFunction, this.jsonSchema, this.createDisposition, this.writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound to(String tableSpec) {
                return this.to(BigQueryIO.parseTableSpec(tableSpec));
            }

            public Bound to(TableReference table) {
                return new Bound(this.name, BigQueryIO.toJsonString(table), this.tableRefFunction, this.jsonSchema, this.createDisposition, this.writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound to(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
                return this.toTableReference(new TranslateTableSpecFunction(tableSpecFunction));
            }

            public Bound toTableReference(SerializableFunction<BoundedWindow, TableReference> tableRefFunction) {
                return new Bound(this.name, this.jsonTableRef, tableRefFunction, this.jsonSchema, this.createDisposition, this.writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound withSchema(TableSchema schema) {
                return new Bound(this.name, this.jsonTableRef, this.tableRefFunction, BigQueryIO.toJsonString(schema), this.createDisposition, this.writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound withCreateDisposition(CreateDisposition createDisposition) {
                return new Bound(this.name, this.jsonTableRef, this.tableRefFunction, this.jsonSchema, createDisposition, this.writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound withWriteDisposition(WriteDisposition writeDisposition) {
                return new Bound(this.name, this.jsonTableRef, this.tableRefFunction, this.jsonSchema, this.createDisposition, writeDisposition, this.validate, this.testBigQueryServices);
            }

            public Bound withoutValidation() {
                return new Bound(this.name, this.jsonTableRef, this.tableRefFunction, this.jsonSchema, this.createDisposition, this.writeDisposition, false, this.testBigQueryServices);
            }

            @VisibleForTesting
            Bound withTestServices(BigQueryServices testServices) {
                return new Bound(this.name, this.jsonTableRef, this.tableRefFunction, this.jsonSchema, this.createDisposition, this.writeDisposition, this.validate, testServices);
            }

            private static void verifyTableEmpty(BigQueryOptions options, TableReference table) {
                block3: {
                    try {
                        Bigquery client = Transport.newBigQueryClient(options).build();
                        BigQueryTableInserter inserter = new BigQueryTableInserter(client);
                        if (!inserter.isEmpty(table)) {
                            throw new IllegalArgumentException("BigQuery table is not empty: " + BigQueryIO.toTableSpec(table));
                        }
                    }
                    catch (IOException e) {
                        ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
                        if (errorExtractor.itemNotFound(e)) break block3;
                        throw new RuntimeException("unable to confirm BigQuery table emptiness for table " + BigQueryIO.toTableSpec(table), e);
                    }
                }
            }

            @Override
            public void validate(PCollection<TableRow> input) {
                BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
                Preconditions.checkState(this.jsonTableRef != null || this.tableRefFunction != null, "must set the table reference of a BigQueryIO.Write transform");
                Preconditions.checkState(this.jsonTableRef == null || this.tableRefFunction == null, "Cannot set both a table reference and a table function for a BigQueryIO.Write transform");
                Preconditions.checkArgument(this.createDisposition != CreateDisposition.CREATE_IF_NEEDED || this.jsonSchema != null, "CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
                if (this.jsonTableRef != null && this.validate) {
                    TableReference table = this.getTable();
                    if (Strings.isNullOrEmpty(table.getProjectId())) {
                        table.setProjectId(options.getProject());
                    }
                    BigQueryIO.verifyDatasetPresence(options, table);
                    if (this.getCreateDisposition() == CreateDisposition.CREATE_NEVER) {
                        BigQueryIO.verifyTablePresence(options, table);
                    }
                    if (this.getWriteDisposition() == WriteDisposition.WRITE_EMPTY) {
                        Bound.verifyTableEmpty(options, table);
                    }
                }
                if (options.isStreaming() || this.tableRefFunction != null) {
                    Preconditions.checkArgument(this.createDisposition != CreateDisposition.CREATE_NEVER, "CreateDisposition.CREATE_NEVER is not supported for an unbounded PCollection or when using a tablespec function.");
                    Preconditions.checkArgument(this.writeDisposition != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or when using a tablespec function.");
                } else {
                    String tempLocation = options.getTempLocation();
                    Preconditions.checkArgument(!Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Write needs a GCS temp location to store temp files.");
                    if (this.testBigQueryServices == null) {
                        try {
                            GcsPath.fromUri(tempLocation);
                        }
                        catch (IllegalArgumentException e) {
                            throw new IllegalArgumentException(String.format("BigQuery temp location expected a valid 'gs://' path, but was given '%s'", tempLocation), e);
                        }
                    }
                }
            }

            @Override
            public PDone apply(PCollection<TableRow> input) {
                String tempFilePrefix;
                Pipeline p = input.getPipeline();
                BigQueryOptions options = p.getOptions().as(BigQueryOptions.class);
                BigQueryServices bqServices = this.getBigQueryServices();
                if (options.isStreaming() || this.tableRefFunction != null) {
                    return input.apply(new StreamWithDeDup(this.getTable(), this.tableRefFunction, this.getSchema()));
                }
                TableReference table = BigQueryIO.fromJsonString(this.jsonTableRef, TableReference.class);
                if (Strings.isNullOrEmpty(table.getProjectId())) {
                    table.setProjectId(options.getProject());
                }
                String jobIdToken = "beam_job_" + BigQueryIO.randomUUIDString();
                String tempLocation = options.getTempLocation();
                try {
                    IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
                    tempFilePrefix = factory.resolve(factory.resolve(tempLocation, "BigQueryWriteTemp"), jobIdToken);
                }
                catch (IOException e) {
                    throw new RuntimeException(String.format("Failed to resolve BigQuery temp location in %s", tempLocation), e);
                }
                PCollection singleton = (PCollection)p.apply("Create", Create.of(tempFilePrefix));
                PCollection inputInGlobalWindow = (PCollection)input.apply(Window.into(new GlobalWindows()).triggering(DefaultTrigger.of()).discardingFiredPanes());
                PCollection results = (PCollection)((Object)inputInGlobalWindow.apply("WriteBundles", ParDo.of(new WriteBundles(tempFilePrefix))));
                TupleTag<KV<Long, List<String>>> multiPartitionsTag = new TupleTag<KV<Long, List<String>>>("multiPartitionsTag"){};
                TupleTag<KV<Long, List<String>>> singlePartitionTag = new TupleTag<KV<Long, List<String>>>("singlePartitionTag"){};
                PCollectionView resultsView = (PCollectionView)results.apply("ResultsView", View.asIterable());
                PCollectionTuple partitions = (PCollectionTuple)((Object)singleton.apply(ParDo.of(new WritePartition(resultsView, multiPartitionsTag, singlePartitionTag)).withSideInputs(resultsView).withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag))));
                PCollection tempTables = (PCollection)((Object)((PCollection)partitions.get(multiPartitionsTag).apply("MultiPartitionsGroupByKey", GroupByKey.create())).apply("MultiPartitionsWriteTables", ParDo.of(new WriteTables(false, bqServices, jobIdToken, tempFilePrefix, BigQueryIO.toJsonString(table), this.jsonSchema, WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_IF_NEEDED))));
                PCollectionView tempTablesView = (PCollectionView)tempTables.apply("TempTablesView", View.asIterable());
                singleton.apply(ParDo.of(new WriteRename(bqServices, jobIdToken, BigQueryIO.toJsonString(table), this.writeDisposition, this.createDisposition, tempTablesView)).withSideInputs(tempTablesView));
                ((PCollection)partitions.get(singlePartitionTag).apply("SinglePartitionGroupByKey", GroupByKey.create())).apply("SinglePartitionWriteTables", ParDo.of(new WriteTables(true, bqServices, jobIdToken, tempFilePrefix, BigQueryIO.toJsonString(table), this.jsonSchema, this.writeDisposition, this.createDisposition)));
                return PDone.in(input.getPipeline());
            }

            @Override
            protected Coder<Void> getDefaultOutputCoder() {
                return VoidCoder.of();
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                builder.addIfNotNull(DisplayData.item("table", this.jsonTableRef).withLabel("Table Reference")).addIfNotNull(DisplayData.item("schema", this.jsonSchema).withLabel("Table Schema"));
                if (this.tableRefFunction != null) {
                    builder.add(DisplayData.item("tableFn", this.tableRefFunction.getClass()).withLabel("Table Reference Function"));
                }
                builder.add(DisplayData.item("createDisposition", this.createDisposition.toString()).withLabel("Table CreateDisposition")).add(DisplayData.item("writeDisposition", this.writeDisposition.toString()).withLabel("Table WriteDisposition")).addIfNotDefault(DisplayData.item("validation", this.validate).withLabel("Validation Enabled"), true);
            }

            public CreateDisposition getCreateDisposition() {
                return this.createDisposition;
            }

            public WriteDisposition getWriteDisposition() {
                return this.writeDisposition;
            }

            public TableSchema getSchema() {
                return BigQueryIO.fromJsonString(this.jsonSchema, TableSchema.class);
            }

            @Nullable
            public TableReference getTable() {
                return BigQueryIO.fromJsonString(this.jsonTableRef, TableReference.class);
            }

            public boolean getValidate() {
                return this.validate;
            }

            private BigQueryServices getBigQueryServices() {
                if (this.testBigQueryServices == null) {
                    this.testBigQueryServices = new BigQueryServicesImpl();
                }
                return this.testBigQueryServices;
            }

            private class WriteBundles
            extends DoFn<TableRow, KV<String, Long>> {
                private TableRowWriter writer = null;
                private final String tempFilePrefix;

                WriteBundles(String tempFilePrefix) {
                    this.tempFilePrefix = tempFilePrefix;
                }

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    if (this.writer == null) {
                        this.writer = new TableRowWriter(this.tempFilePrefix);
                        this.writer.open(UUID.randomUUID().toString());
                        LOG.debug("Done opening writer {}", (Object)this.writer);
                    }
                    try {
                        this.writer.write((TableRow)c.element());
                    }
                    catch (Exception e) {
                        try {
                            this.writer.close();
                        }
                        catch (Exception closeException) {
                            e.addSuppressed(closeException);
                        }
                        throw e;
                    }
                }

                @Override
                public void finishBundle(DoFn.Context c) throws Exception {
                    if (this.writer != null) {
                        c.output(this.writer.close());
                        this.writer = null;
                    }
                }

                @Override
                public void populateDisplayData(DisplayData.Builder builder) {
                    super.populateDisplayData(builder);
                    builder.addIfNotNull(DisplayData.item("tempFilePrefix", this.tempFilePrefix).withLabel("Temporary File Prefix"));
                }
            }

            private static class TranslateTableSpecFunction
            implements SerializableFunction<BoundedWindow, TableReference> {
                private SerializableFunction<BoundedWindow, String> tableSpecFunction;

                TranslateTableSpecFunction(SerializableFunction<BoundedWindow, String> tableSpecFunction) {
                    this.tableSpecFunction = tableSpecFunction;
                }

                @Override
                public TableReference apply(BoundedWindow value) {
                    return BigQueryIO.parseTableSpec(this.tableSpecFunction.apply(value));
                }
            }
        }

        public static enum WriteDisposition {
            WRITE_TRUNCATE,
            WRITE_APPEND,
            WRITE_EMPTY;

        }

        public static enum CreateDisposition {
            CREATE_NEVER,
            CREATE_IF_NEEDED;

        }
    }

    @VisibleForTesting
    static class TransformingSource<T, V>
    extends BoundedSource<V> {
        private final BoundedSource<T> boundedSource;
        private final SerializableFunction<T, V> function;
        private final Coder<V> outputCoder;

        TransformingSource(BoundedSource<T> boundedSource, SerializableFunction<T, V> function, Coder<V> outputCoder) {
            this.boundedSource = Preconditions.checkNotNull(boundedSource, "boundedSource");
            this.function = Preconditions.checkNotNull(function, "function");
            this.outputCoder = Preconditions.checkNotNull(outputCoder, "outputCoder");
        }

        @Override
        public List<? extends BoundedSource<V>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            return Lists.transform(this.boundedSource.splitIntoBundles(desiredBundleSizeBytes, options), new Function<BoundedSource<T>, BoundedSource<V>>(){

                @Override
                public BoundedSource<V> apply(BoundedSource<T> input) {
                    return new TransformingSource(input, TransformingSource.this.function, TransformingSource.this.outputCoder);
                }
            });
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
            return this.boundedSource.getEstimatedSizeBytes(options);
        }

        @Override
        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return this.boundedSource.producesSortedKeys(options);
        }

        @Override
        public BoundedSource.BoundedReader<V> createReader(PipelineOptions options) throws IOException {
            return new TransformingReader(this.boundedSource.createReader(options));
        }

        @Override
        public void validate() {
            this.boundedSource.validate();
        }

        @Override
        public Coder<V> getDefaultOutputCoder() {
            return this.outputCoder;
        }

        private class TransformingReader
        extends BoundedSource.BoundedReader<V> {
            private final BoundedSource.BoundedReader<T> boundedReader;

            private TransformingReader(BoundedSource.BoundedReader<T> boundedReader) {
                this.boundedReader = Preconditions.checkNotNull(boundedReader, "boundedReader");
            }

            @Override
            public synchronized BoundedSource<V> getCurrentSource() {
                return new TransformingSource(this.boundedReader.getCurrentSource(), TransformingSource.this.function, TransformingSource.this.outputCoder);
            }

            @Override
            public boolean start() throws IOException {
                return this.boundedReader.start();
            }

            @Override
            public boolean advance() throws IOException {
                return this.boundedReader.advance();
            }

            @Override
            public V getCurrent() throws NoSuchElementException {
                Object current = this.boundedReader.getCurrent();
                return TransformingSource.this.function.apply(current);
            }

            @Override
            public void close() throws IOException {
                this.boundedReader.close();
            }

            @Override
            public synchronized BoundedSource<V> splitAtFraction(double fraction) {
                BoundedSource split = this.boundedReader.splitAtFraction(fraction);
                return split == null ? null : new TransformingSource(split, TransformingSource.this.function, TransformingSource.this.outputCoder);
            }

            @Override
            public Double getFractionConsumed() {
                return this.boundedReader.getFractionConsumed();
            }

            @Override
            public Instant getCurrentTimestamp() throws NoSuchElementException {
                return this.boundedReader.getCurrentTimestamp();
            }
        }
    }

    private static abstract class BigQuerySourceBase
    extends BoundedSource<TableRow> {
        private static final int MAX_FILES_VERIFY_RETRIES = 9;
        protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
        private static final Duration INITIAL_FILES_VERIFY_BACKOFF = Duration.standardSeconds((long)1L);
        protected final String jobIdToken;
        protected final String extractDestinationDir;
        protected final BigQueryServices bqServices;
        protected final String executingProject;

        private BigQuerySourceBase(String jobIdToken, String extractDestinationDir, BigQueryServices bqServices, String executingProject) {
            this.jobIdToken = Preconditions.checkNotNull(jobIdToken, "jobIdToken");
            this.extractDestinationDir = Preconditions.checkNotNull(extractDestinationDir, "extractDestinationDir");
            this.bqServices = Preconditions.checkNotNull(bqServices, "bqServices");
            this.executingProject = Preconditions.checkNotNull(executingProject, "executingProject");
        }

        @Override
        public List<BoundedSource<TableRow>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
            BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
            TableReference tableToExtract = this.getTableToExtract(bqOptions);
            BigQueryServices.JobService jobService = this.bqServices.getJobService(bqOptions);
            String extractJobId = BigQueryIO.getExtractJobId(this.jobIdToken);
            List<String> tempFiles = this.executeExtract(extractJobId, tableToExtract, jobService);
            TableSchema tableSchema = this.bqServices.getDatasetService(bqOptions).getTable(tableToExtract.getProjectId(), tableToExtract.getDatasetId(), tableToExtract.getTableId()).getSchema();
            this.cleanupTempResource(bqOptions);
            return this.createSources(tempFiles, tableSchema);
        }

        protected abstract TableReference getTableToExtract(BigQueryOptions var1) throws Exception;

        protected abstract void cleanupTempResource(BigQueryOptions var1) throws Exception;

        @Override
        public boolean producesSortedKeys(PipelineOptions options) throws Exception {
            return false;
        }

        @Override
        public void validate() {
        }

        @Override
        public Coder<TableRow> getDefaultOutputCoder() {
            return TableRowJsonCoder.of();
        }

        private List<String> executeExtract(String jobId, TableReference table, BigQueryServices.JobService jobService) throws InterruptedException, IOException {
            JobReference jobRef = new JobReference().setProjectId(this.executingProject).setJobId(jobId);
            String destinationUri = BigQueryIO.getExtractDestinationUri(this.extractDestinationDir);
            JobConfigurationExtract extract = new JobConfigurationExtract().setSourceTable(table).setDestinationFormat("AVRO").setDestinationUris(ImmutableList.of(destinationUri));
            LOG.info("Starting BigQuery extract job: {}", (Object)jobId);
            jobService.startExtractJob(jobRef, extract);
            Job extractJob = jobService.pollJob(jobRef, Integer.MAX_VALUE);
            if (BigQueryIO.parseStatus(extractJob) != Status.SUCCEEDED) {
                throw new IOException(String.format("Extract job %s failed, status: %s", extractJob.getJobReference().getJobId(), extractJob.getStatus()));
            }
            List tempFiles = BigQueryIO.getExtractFilePaths(this.extractDestinationDir, extractJob);
            return ImmutableList.copyOf(tempFiles);
        }

        private List<BoundedSource<TableRow>> createSources(List<String> files, TableSchema tableSchema) throws IOException, InterruptedException {
            final String jsonSchema = JSON_FACTORY.toString((Object)tableSchema);
            SerializableFunction<GenericRecord, TableRow> function = new SerializableFunction<GenericRecord, TableRow>(){

                @Override
                public TableRow apply(GenericRecord input) {
                    try {
                        return AvroUtils.convertGenericRecordToTableRow(input, (TableSchema)JSON_FACTORY.fromString(jsonSchema, TableSchema.class));
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Failed to convert GenericRecord to TableRow", e);
                    }
                }
            };
            ArrayList<TransformingSource<GenericRecord, TableRow>> avroSources = Lists.newArrayList();
            for (String fileName : files) {
                avroSources.add(new TransformingSource<GenericRecord, TableRow>(AvroSource.from(fileName), function, this.getDefaultOutputCoder()));
            }
            return ImmutableList.copyOf(avroSources);
        }

        protected static class BigQueryReader
        extends BoundedSource.BoundedReader<TableRow> {
            private final BigQuerySourceBase source;
            private final BigQueryServices.BigQueryJsonReader reader;

            private BigQueryReader(BigQuerySourceBase source, BigQueryServices.BigQueryJsonReader reader) {
                this.source = source;
                this.reader = reader;
            }

            @Override
            public BoundedSource<TableRow> getCurrentSource() {
                return this.source;
            }

            @Override
            public boolean start() throws IOException {
                return this.reader.start();
            }

            @Override
            public boolean advance() throws IOException {
                return this.reader.advance();
            }

            @Override
            public TableRow getCurrent() throws NoSuchElementException {
                return this.reader.getCurrent();
            }

            @Override
            public void close() throws IOException {
                this.reader.close();
            }
        }
    }

    @VisibleForTesting
    static class BigQueryQuerySource
    extends BigQuerySourceBase {
        private final String query;
        private final String jsonQueryTempTable;
        private final Boolean flattenResults;
        private transient AtomicReference<JobStatistics> dryRunJobStats;

        static BigQueryQuerySource create(String jobIdToken, String query, TableReference queryTempTableRef, Boolean flattenResults, String extractDestinationDir, BigQueryServices bqServices) {
            return new BigQueryQuerySource(jobIdToken, query, queryTempTableRef, flattenResults, extractDestinationDir, bqServices);
        }

        private BigQueryQuerySource(String jobIdToken, String query, TableReference queryTempTableRef, Boolean flattenResults, String extractDestinationDir, BigQueryServices bqServices) {
            super(jobIdToken, extractDestinationDir, bqServices, Preconditions.checkNotNull(queryTempTableRef, "queryTempTableRef").getProjectId());
            this.query = Preconditions.checkNotNull(query, "query");
            this.jsonQueryTempTable = BigQueryIO.toJsonString(queryTempTableRef);
            this.flattenResults = Preconditions.checkNotNull(flattenResults, "flattenResults");
            this.dryRunJobStats = new AtomicReference();
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
            BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
            return this.dryRunQueryIfNeeded(bqOptions).getTotalBytesProcessed();
        }

        @Override
        public BoundedSource.BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
            BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
            return new BigQuerySourceBase.BigQueryReader(this, this.bqServices.getReaderFromQuery(bqOptions, this.query, this.executingProject, this.flattenResults));
        }

        @Override
        protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException, InterruptedException {
            TableReference dryRunTempTable = (TableReference)this.dryRunQueryIfNeeded(bqOptions).getQuery().getReferencedTables().get(0);
            BigQueryServices.DatasetService tableService = this.bqServices.getDatasetService(bqOptions);
            String location = tableService.getTable(dryRunTempTable.getProjectId(), dryRunTempTable.getDatasetId(), dryRunTempTable.getTableId()).getLocation();
            TableReference tableToExtract = (TableReference)JSON_FACTORY.fromString(this.jsonQueryTempTable, TableReference.class);
            tableService.createDataset(tableToExtract.getProjectId(), tableToExtract.getDatasetId(), location, "");
            String queryJobId = this.jobIdToken + "-query";
            BigQueryQuerySource.executeQuery(this.executingProject, queryJobId, this.query, tableToExtract, this.flattenResults, this.bqServices.getJobService(bqOptions));
            return tableToExtract;
        }

        @Override
        protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
            TableReference tableToRemove = (TableReference)JSON_FACTORY.fromString(this.jsonQueryTempTable, TableReference.class);
            BigQueryServices.DatasetService tableService = this.bqServices.getDatasetService(bqOptions);
            tableService.deleteTable(tableToRemove.getProjectId(), tableToRemove.getDatasetId(), tableToRemove.getTableId());
            tableService.deleteDataset(tableToRemove.getProjectId(), tableToRemove.getDatasetId());
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("query", this.query));
        }

        private synchronized JobStatistics dryRunQueryIfNeeded(BigQueryOptions bqOptions) throws InterruptedException, IOException {
            if (this.dryRunJobStats.get() == null) {
                JobStatistics jobStats = this.bqServices.getJobService(bqOptions).dryRunQuery(this.executingProject, this.query);
                this.dryRunJobStats.compareAndSet(null, jobStats);
            }
            return this.dryRunJobStats.get();
        }

        private static void executeQuery(String executingProject, String jobId, String query, TableReference destinationTable, boolean flattenResults, BigQueryServices.JobService jobService) throws IOException, InterruptedException {
            JobReference jobRef = new JobReference().setProjectId(executingProject).setJobId(jobId);
            JobConfigurationQuery queryConfig = new JobConfigurationQuery();
            queryConfig.setQuery(query).setAllowLargeResults(Boolean.valueOf(true)).setCreateDisposition("CREATE_IF_NEEDED").setDestinationTable(destinationTable).setFlattenResults(Boolean.valueOf(flattenResults)).setPriority("BATCH").setWriteDisposition("WRITE_EMPTY");
            jobService.startQueryJob(jobRef, queryConfig);
            Job job = jobService.pollJob(jobRef, Integer.MAX_VALUE);
            if (BigQueryIO.parseStatus(job) != Status.SUCCEEDED) {
                throw new IOException("Query job failed: " + jobId);
            }
        }

        private void readObject(ObjectInputStream in) throws ClassNotFoundException, IOException {
            in.defaultReadObject();
            this.dryRunJobStats = new AtomicReference();
        }
    }

    @VisibleForTesting
    static class BigQueryTableSource
    extends BigQuerySourceBase {
        private final String jsonTable;
        private final AtomicReference<Long> tableSizeBytes;

        static BigQueryTableSource create(String jobIdToken, TableReference table, String extractDestinationDir, BigQueryServices bqServices, String executingProject) {
            return new BigQueryTableSource(jobIdToken, table, extractDestinationDir, bqServices, executingProject);
        }

        private BigQueryTableSource(String jobIdToken, TableReference table, String extractDestinationDir, BigQueryServices bqServices, String executingProject) {
            super(jobIdToken, extractDestinationDir, bqServices, executingProject);
            this.jsonTable = BigQueryIO.toJsonString(Preconditions.checkNotNull(table, "table"));
            this.tableSizeBytes = new AtomicReference();
        }

        @Override
        protected TableReference getTableToExtract(BigQueryOptions bqOptions) throws IOException {
            return (TableReference)JSON_FACTORY.fromString(this.jsonTable, TableReference.class);
        }

        @Override
        public BoundedSource.BoundedReader<TableRow> createReader(PipelineOptions options) throws IOException {
            BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
            TableReference tableRef = (TableReference)JSON_FACTORY.fromString(this.jsonTable, TableReference.class);
            return new BigQuerySourceBase.BigQueryReader(this, this.bqServices.getReaderFromTable(bqOptions, tableRef));
        }

        @Override
        public synchronized long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
            if (this.tableSizeBytes.get() == null) {
                TableReference table = (TableReference)JSON_FACTORY.fromString(this.jsonTable, TableReference.class);
                Long numBytes = this.bqServices.getDatasetService(options.as(BigQueryOptions.class)).getTable(table.getProjectId(), table.getDatasetId(), table.getTableId()).getNumBytes();
                this.tableSizeBytes.compareAndSet(null, numBytes);
            }
            return this.tableSizeBytes.get();
        }

        @Override
        protected void cleanupTempResource(BigQueryOptions bqOptions) throws Exception {
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("table", this.jsonTable));
        }
    }

    @VisibleForTesting
    static class PassThroughThenCleanup<T>
    extends PTransform<PCollection<T>, PCollection<T>> {
        private CleanupOperation cleanupOperation;

        PassThroughThenCleanup(CleanupOperation cleanupOperation) {
            this.cleanupOperation = cleanupOperation;
        }

        @Override
        public PCollection<T> apply(PCollection<T> input) {
            TupleTag mainOutput = new TupleTag();
            TupleTag cleanupSignal = new TupleTag();
            PCollectionTuple outputs = (PCollectionTuple)input.apply(ParDo.of(new IdentityFn()).withOutputTags(mainOutput, TupleTagList.of(cleanupSignal)));
            PCollectionView cleanupSignalView = (PCollectionView)((PCollection)outputs.get(cleanupSignal).setCoder((Coder)VoidCoder.of())).apply(View.asSingleton().withDefaultValue(null));
            ((PCollection)input.getPipeline().apply("Create(CleanupOperation)", Create.of(this.cleanupOperation))).apply("Cleanup", ParDo.of(new DoFn<CleanupOperation, Void>(){

                @Override
                public void processElement(DoFn.ProcessContext c) throws Exception {
                    ((CleanupOperation)c.element()).cleanup(c.getPipelineOptions());
                }
            }).withSideInputs(cleanupSignalView));
            return outputs.get(mainOutput);
        }

        static abstract class CleanupOperation
        implements Serializable {
            CleanupOperation() {
            }

            abstract void cleanup(PipelineOptions var1) throws Exception;
        }

        private static class IdentityFn<T>
        extends DoFn<T, T> {
            private IdentityFn() {
            }

            @Override
            public void processElement(DoFn.ProcessContext c) {
                c.output(c.element());
            }
        }
    }

    public static class Read {
        public static Bound named(String name) {
            return new Bound().named(name);
        }

        public static Bound from(String tableSpec) {
            return new Bound().from(tableSpec);
        }

        public static Bound fromQuery(String query) {
            return new Bound().fromQuery(query);
        }

        public static Bound from(TableReference table) {
            return new Bound().from(table);
        }

        public static Bound withoutValidation() {
            return new Bound().withoutValidation();
        }

        private Read() {
        }

        public static class Bound
        extends PTransform<PInput, PCollection<TableRow>> {
            @Nullable
            final String jsonTableRef;
            @Nullable
            final String query;
            final boolean validate;
            @Nullable
            final Boolean flattenResults;
            @Nullable
            BigQueryServices testBigQueryServices;
            private static final String QUERY_VALIDATION_FAILURE_ERROR = "Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the pipeline, This validation can be disabled using #withoutValidation.";

            private Bound() {
                this(null, null, null, true, null, null);
            }

            private Bound(String name, @Nullable String query, @Nullable String jsonTableRef, boolean validate, @Nullable Boolean flattenResults, @Nullable BigQueryServices testBigQueryServices) {
                super(name);
                this.jsonTableRef = jsonTableRef;
                this.query = query;
                this.validate = validate;
                this.flattenResults = flattenResults;
                this.testBigQueryServices = testBigQueryServices;
            }

            public Bound named(String name) {
                return new Bound(name, this.query, this.jsonTableRef, this.validate, this.flattenResults, this.testBigQueryServices);
            }

            public Bound from(String tableSpec) {
                return this.from(BigQueryIO.parseTableSpec(tableSpec));
            }

            public Bound from(TableReference table) {
                return new Bound(this.name, this.query, BigQueryIO.toJsonString(table), this.validate, this.flattenResults, this.testBigQueryServices);
            }

            public Bound fromQuery(String query) {
                return new Bound(this.name, query, this.jsonTableRef, this.validate, MoreObjects.firstNonNull(this.flattenResults, Boolean.TRUE), this.testBigQueryServices);
            }

            public Bound withoutValidation() {
                return new Bound(this.name, this.query, this.jsonTableRef, false, this.flattenResults, this.testBigQueryServices);
            }

            public Bound withoutResultFlattening() {
                return new Bound(this.name, this.query, this.jsonTableRef, this.validate, false, this.testBigQueryServices);
            }

            @VisibleForTesting
            Bound withTestServices(BigQueryServices testServices) {
                return new Bound(this.name, this.query, this.jsonTableRef, this.validate, this.flattenResults, testServices);
            }

            @Override
            public void validate(PInput input) {
                if (this.validate) {
                    TableReference table;
                    BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
                    String tempLocation = bqOptions.getTempLocation();
                    Preconditions.checkArgument(!Strings.isNullOrEmpty(tempLocation), "BigQueryIO.Read needs a GCS temp location to store temp files.");
                    if (this.testBigQueryServices == null) {
                        try {
                            GcsPath.fromUri(tempLocation);
                        }
                        catch (IllegalArgumentException e) {
                            throw new IllegalArgumentException(String.format("BigQuery temp location expected a valid 'gs://' path, but was given '%s'", tempLocation), e);
                        }
                    }
                    if ((table = this.getTableWithDefaultProject(bqOptions)) == null && this.query == null) {
                        throw new IllegalStateException("Invalid BigQuery read operation, either table reference or query has to be set");
                    }
                    if (table != null && this.query != null) {
                        throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a query and a table, only one of these should be provided");
                    }
                    if (table != null && this.flattenResults != null) {
                        throw new IllegalStateException("Invalid BigQuery read operation. Specifies a table with a result flattening preference, which is not configurable");
                    }
                    if (this.query != null && this.flattenResults == null) {
                        throw new IllegalStateException("Invalid BigQuery read operation. Specifies a query without a result flattening preference");
                    }
                    if (table != null) {
                        BigQueryIO.verifyDatasetPresence(bqOptions, table);
                        BigQueryIO.verifyTablePresence(bqOptions, table);
                    }
                    if (this.query != null) {
                        Bound.dryRunQuery(bqOptions, this.query);
                    }
                }
            }

            private static void dryRunQuery(BigQueryOptions options, String query) {
                Bigquery client = Transport.newBigQueryClient(options).build();
                QueryRequest request = new QueryRequest();
                request.setQuery(query);
                request.setDryRun(Boolean.valueOf(true));
                String queryValidationErrorMsg = String.format(QUERY_VALIDATION_FAILURE_ERROR, query);
                try {
                    BigQueryTableRowIterator.executeWithBackOff(client.jobs().query(options.getProject(), request), queryValidationErrorMsg);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(queryValidationErrorMsg, e);
                }
            }

            @Override
            public PCollection<TableRow> apply(PInput input) {
                BigQuerySourceBase source;
                String extractDestinationDir;
                String uuid = BigQueryIO.randomUUIDString();
                final String jobIdToken = "beam_job_" + uuid;
                BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
                final BigQueryServices bqServices = this.getBigQueryServices();
                String tempLocation = bqOptions.getTempLocation();
                try {
                    IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
                    extractDestinationDir = factory.resolve(tempLocation, uuid);
                }
                catch (IOException e) {
                    throw new RuntimeException(String.format("Failed to resolve extract destination directory in %s", tempLocation));
                }
                final String executingProject = bqOptions.getProject();
                if (!Strings.isNullOrEmpty(this.query)) {
                    String queryTempDatasetId = "temp_dataset_" + uuid;
                    String queryTempTableId = "temp_table_" + uuid;
                    TableReference queryTempTableRef = new TableReference().setProjectId(executingProject).setDatasetId(queryTempDatasetId).setTableId(queryTempTableId);
                    source = BigQueryQuerySource.create(jobIdToken, this.query, queryTempTableRef, this.flattenResults, extractDestinationDir, bqServices);
                } else {
                    TableReference inputTable = this.getTableWithDefaultProject(bqOptions);
                    source = BigQueryTableSource.create(jobIdToken, inputTable, extractDestinationDir, bqServices, executingProject);
                }
                PassThroughThenCleanup.CleanupOperation cleanupOperation = new PassThroughThenCleanup.CleanupOperation(){

                    @Override
                    void cleanup(PipelineOptions options) throws Exception {
                        BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
                        JobReference jobRef = new JobReference().setProjectId(executingProject).setJobId(BigQueryIO.getExtractJobId(jobIdToken));
                        Job extractJob = bqServices.getJobService(bqOptions).getJob(jobRef);
                        Collection<Object> extractFiles = null;
                        if (extractJob != null) {
                            extractFiles = BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
                        } else {
                            IOChannelFactory factory = IOChannelUtils.getFactory(extractDestinationDir);
                            Collection<String> dirMatch = factory.match(extractDestinationDir);
                            if (!dirMatch.isEmpty()) {
                                extractFiles = factory.match(factory.resolve(extractDestinationDir, "*"));
                            }
                        }
                        if (extractFiles != null && !extractFiles.isEmpty()) {
                            new GcsUtil.GcsUtilFactory().create(options).remove(extractFiles);
                        }
                    }
                };
                return (PCollection)((PCollection)((PCollection)input.getPipeline().apply(com.google.cloud.dataflow.sdk.io.Read.from(source))).setCoder((Coder)this.getDefaultOutputCoder())).apply(new PassThroughThenCleanup(cleanupOperation));
            }

            @Override
            protected Coder<TableRow> getDefaultOutputCoder() {
                return TableRowJsonCoder.of();
            }

            @Override
            public void populateDisplayData(DisplayData.Builder builder) {
                super.populateDisplayData(builder);
                TableReference table = this.getTable();
                if (table != null) {
                    builder.add(DisplayData.item("table", BigQueryIO.toTableSpec(table)).withLabel("Table"));
                }
                builder.addIfNotNull(DisplayData.item("query", this.query).withLabel("Query")).addIfNotNull(DisplayData.item("flattenResults", this.flattenResults).withLabel("Flatten Query Results")).addIfNotDefault(DisplayData.item("validation", this.validate).withLabel("Validation Enabled"), true);
            }

            @Nullable
            private TableReference getTableWithDefaultProject(BigQueryOptions bqOptions) {
                TableReference table = this.getTable();
                if (table != null && table.getProjectId() == null) {
                    table.setProjectId(bqOptions.getProject());
                }
                return table;
            }

            public TableReference getTable() {
                return BigQueryIO.fromJsonString(this.jsonTableRef, TableReference.class);
            }

            public String getQuery() {
                return this.query;
            }

            public boolean getValidate() {
                return this.validate;
            }

            public Boolean getFlattenResults() {
                return this.flattenResults;
            }

            private BigQueryServices getBigQueryServices() {
                if (this.testBigQueryServices == null) {
                    this.testBigQueryServices = new BigQueryServicesImpl();
                }
                return this.testBigQueryServices;
            }
        }
    }
}

