0
votes

I want to write the values from the key, value pairs to text files in GCS by key using FileIO with writeDynamic() in Apache Beam (using Java).

So far, I'm reading the data from Big Query, transforming it into a key, value pairs and then try to use FileIO with writeDynamic() to write the values into one file per key.

PCollection<TableRow> inputRows = p.apply(BigQueryIO.readTableRows()
    .from(tableSpec)
    .withMethod(Method.DIRECT_READ)
    .withSelectedFields(Lists.newArrayList("id", "string1", "string2", "string3", "int1")));

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
    .via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("gs://bucket/output")
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

I get the error:

The method apply
  (PTransform<? super PCollection<KV<Integer,String>>,OutputT>) 
  in the type PCollection<KV<Integer,String>> 
  is not applicable for the arguments 
  (FileIO.Write<String,KV<String,String>>)
1

1 Answers

0
votes

There is a type mismatch. Notice that the TableRow element is parsed into a KV<Integer, String> in MapElements (i.e. the key is an Integer). Then, the write step is expecting a String key as in .apply(FileIO.<String, KV<String, String>>writeDynamic():

inputRows.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.integers(), TypeDescriptors.strings()))
    .via(tableRow -> KV.of((Integer) tableRow.get("id"),(String) tableRow.get("string1"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    ...

To avoid having to cast the key again when using .by(KV::getKey) I would recommend to cast it as a String before:

inputRows
    .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
        .via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
    .apply(FileIO.<String, KV<String, String>>writeDynamic()
        .by(KV::getKey)

As an example I tested this with a public table bigquery-public-data:london_bicycles.cycle_stations where I write each bike station to a different file:

$ cat output/file-746-00000-of-00004.txt 
Lots Road, West Chelsea

$ bq query --use_legacy_sql=false "SELECT name FROM \`bigquery-public-data.london_bicycles.cycle_stations\` WHERE id = 746"
Waiting on bqjob_<ID> ... (0s) Current status: DONE   
+-------------------------+
|          name           |
+-------------------------+
| Lots Road, West Chelsea |
+-------------------------+

Full code:

package com.dataflow.samples;

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;


public abstract class DynamicGCSWrites {

    public interface Options extends PipelineOptions {
        @Validation.Required
        @Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
        String getOutput();
        void setOutput(String s);
    }

    public static void main(String[] args) {

        DynamicGCSWrites.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DynamicGCSWrites.Options.class);

        Pipeline p = Pipeline.create(options);

        String output = options.getOutput();

        PCollection<TableRow> inputRows = p
            .apply(BigQueryIO.readTableRows()
                .from("bigquery-public-data:london_bicycles.cycle_stations")
                .withMethod(Method.DIRECT_READ)
                .withSelectedFields(Lists.newArrayList("id", "name")));

        inputRows
            .apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("id"),(String) tableRow.get("name"))))
            .apply(FileIO.<String, KV<String, String>>writeDynamic()
                .by(KV::getKey)
                .withDestinationCoder(StringUtf8Coder.of())
                .via(Contextful.fn(KV::getValue), TextIO.sink())
                .to(output)
                .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

        p.run().waitUntilFinish();
    }
}