0
votes

We try to load data from GCS in Beam in pubsub way. Once there is new data uploaded to GCS, we can load data in time through pubsub in Beam. However, It failed to load data from GCS.

My pipeline is


class ParseAndFilterDo(beam.DoFn):
    def __int__(self):
        super(ParseAndFilterDo, self).__init__()
        self.num_parse_errors = Metrics.counter(self.__class__, 'num_parse_errors')

    def process(self, element):
        text_line = element.strip()
        data = {}
        try:
            data = json.loads(text_line)
            print(data)
            yield data
        except Exception as ex:
            print("Parse json exception:", ex)
            self.num_parse_errors.inc()

 ...

   pipeline_args.extend([
        '--runner=DirectRunner',
        '--staging_location=gs://my-transform-bucket/stage',
        '--temp_location=gs://my-transform-bucket/temp',
        '--job_name=test-sub-job',
    ])
    options = PipelineOptions(pipeline_args)
    options.view_as(SetupOptions).save_main_session = True
    options.view_as(StandardOptions).streaming = True

    with beam.Pipeline(options=options) as p:
        events = p | "ReadPubSub" >> beam.io.ReadFromPubSub(topic=args.topic)

        raw_events = (
            events
            | 'DecodeString' >> beam.Map( lambda b: b.decode('utf-8'))
            | "ParseAndFilterDo" >> beam.ParDo(ParseAndFilterDo())
        )

And set topic to GCS bucket as

gsutil notification create -t testtopic -f json -e OBJECT_FINALIZE gs://my-test-bucket

Also the Google Cloud Pub/Sub API is activated.

Then I try to upload json data with gz file type to my-test-bucket, and the logs show

DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): oauth2.googleapis.com:443
DEBUG:urllib3.connectionpool:https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
{u'kind': u'storage#object', u'contentType': u'application/x-gzip', u'name': u'log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz', u'timeCreated': u'2019-08-14T05:47:19.664Z', u'generation': u'1565761639664269', u'md5Hash': u'7mAixitzv6WDVVa1ar37Vw==', u'bucket': u'my-test-bucket', u'updated': u'2019-08-14T05:47:19.664Z', u'crc32c': u'UHiIrQ==', u'metageneration': u'1', u'mediaLink': u'https://www.googleapis.com/download/storage/v1/b/my-test-bucket/o/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz?generation=15657616399&alt=media', u'storageClass': u'MULTI_REGIONAL', u'timeStorageClassUpdated': u'2019-08-14T05:47:19.664Z', u'etag': u'CI2V19LEAE=', u'id': u'my-test-bucket/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz/1565761639664269', u'selfLink': u'https://www.googleapis.com/storage/v1/b/my-test-bucket/o/log_2019-08-12T00.4763-4caf-b712-cd1b815c203932.log.gz', u'size': u'55259'}
DEBUG:root:Connecting using Google Application Default Credentials.
DEBUG:root:Attempting to flush to all destinations. Total buffered: 0

It seems only storage object event triggered here. But there is no data payload to read in Beam.

Is there any issue on my config or something I am missing?

  • beam version: 2.14.0
  • google-cloud-pubsub: 0.45.0
  • grpcio: 1.22.0
1

1 Answers

1
votes

Pub/Sub notifications will only contain event metadata (the uploaded object is not sent through Pub/Sub messages).

If I understand the use case correctly and you want to read the file contents, you will need to parse the notification to get the full file path and then pass the resulting PCollection to beam.io.ReadAllFromText() as in:

class ExtractFn(beam.DoFn):
    def process(self, element):
        file_name = 'gs://' + "/".join(element['id'].split("/")[:-1])
        logging.info('File: ' + file_name) 
        yield file_name

Note that I used the id field of the sample message you provided (and removed the last part which I guess is for version control).

My main pipeline is:

(p
  | 'Read Messages' >> beam.io.ReadFromPubSub(topic="projects/PROJECT/topics/TOPIC")
  | 'Convert Message to JSON' >> beam.Map(lambda message: json.loads(message))
  | 'Extract File Names' >> beam.ParDo(ExtractFn())
  | 'Read Files' >> beam.io.ReadAllFromText()
  | 'Write Results' >> beam.ParDo(LogFn()))

Full code here.

I tested it with the direct runner and 2.14.0 SDK, the public file gs://apache-beam-samples/shakespeare/kinglear.txt and a test message (not a real notification):

python notifications.py --streaming
gcloud pubsub topics publish $TOPIC_NAME --message='{"id": "apache-beam-samples/shakespeare/kinglear.txt/1565795872"}'

To start printing Shakespeare's King Lear:

INFO:root:File: gs://apache-beam-samples/shakespeare/kinglear.txt
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
...
INFO:root:  KING LEAR
INFO:root:
INFO:root:
INFO:root:  DRAMATIS PERSONAE
INFO:root:
INFO:root:
INFO:root:LEAR  king of Britain  (KING LEAR:)
INFO:root:
INFO:root:KING OF FRANCE: