1
votes

I'm building an apache beam streaming pipeline whose source is Pubsub and sink is BigQuery. I've gotten the error messsage:

"Workflow failed. Causes: Unknown message code."

As cryptic as this message is I now believe it to be the case that BigQuery is not supported as a sink for streaming pipelines, it says this here: Streaming from Pub/Sub to BigQuery

Am I certainly correct that this is what's causing the problem? Or if not is it still not supported in any case?

Can anyone hint at when this feature will be released? It's a shame, I was pretty excited to get using this.

1

1 Answers

2
votes

Python Streaming pipelines are experimentally available since Beam 2.5.0 as documented in beam docs here

Therefore you will need to install apache-beam 2.5.0 and apache-beam[gcp]

pip install apache-beam==2.5.0
pip install apache-beam[gcp]

I ran this command:

python pubsub_to_bq.py --runner DataflowRunner --input_topic=projects/pubsub-public-data/topics/taxirides-realtime --project <my-project> --temp_location gs://<my-bucket>/tmp --staging_location gs://<my-bucket>/staging --streaming

Using the code below, and it works alright:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam

def parse_pubsub(line):
    import json
    record = json.loads(line)
    return (record['ride_id']), (record['point_idx']), (record['latitude']), (record['longitude']), (record['timestamp']), (record['meter_increment']), (record['ride_status']), (record['meter_reading']), (record['passenger_count'])

def run(argv=None):
  """Build and run the pipeline."""

  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input_topic', dest='input_topic', required=True,
      help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
  known_args, pipeline_args = parser.parse_known_args(argv)

  with beam.Pipeline(argv=pipeline_args) as p:

    # Read from PubSub
    lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
    #Adapt messages from PubSub to BQ table
    lines = lines | beam.Map(parse_pubsub)
    lines = lines | beam.Map(lambda (ride_id, point_idx, latitude, longitude, timestamp, meter_increment, ride_status,meter_reading, passenger_count): {'ride_id':ride_id, 'point_idx':point_idx, 'latitude':latitude, 'longitude':longitude, 'timestamp':timestamp, 'meter_increment':meter_increment,'ride_status': ride_status,'meter_reading':meter_reading,'passenger_count': passenger_count})
    #Write to a BQ table 
    lines | beam.io.WriteToBigQuery(table ='<my-table>',dataset='<my-dataset>',project='<my-project>' )

if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

This code uses the publicly available topic "--topic projects/pubsub-public-data/topics/taxirides-realtime" and BQ table that I have created with the right schema.

If you use this example be careful not leaving it running or you will incur into costs as you will receive a lot messages coming from this PubSub topic.