8
votes

I would like to insert PubSub messages data coming from a topic into a BigQuery table using Google Cloud Dataflow. Everything works great but in the BigQuery table I can see unreadable strings like " ߈���". This is my pipeline:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))

and my simple StringToRowConverter function is:

class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) {
    for (String word : c.element().split(",")) {
      if (!word.isEmpty()) {
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      }
    }
}
}

And this is the message I sent through a POST request:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
 "messages": [
  {
   "attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
   },
   "data": "34gf5ert"
  }
 ]
}

What am I missing? Thank you!

2
This is an open source you can use to direct pub/sub to BQPUG

2 Answers

9
votes

According to https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage , the JSON payload of the pubsub message is base64 encoded. PubsubIO in Dataflow, by default, uses the String UTF8 coder. The example string you provided "34gf5ert", when base64-decoded and then interpreted as an UTF-8 string, gives exactly "߈���".

2
votes

This is how I am unpacking my pubsub messages:

@Override
public void processElement(ProcessContext c) {

    String json = c.element();

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());
    String unpacked = items.get("JsonKey");

Hope its useful to you.