5
votes

Evaluating Dataflow, and am trying to figure out if/how to do the following.

My apologies if anything in the above is trivial--trying to wrap our heads around Dataflow before we make a decision on using Beam, or something else like Spark, etc.

General use case is for machine learning:

  • Ingesting documents which are individually processed.

  • In addition to easy-to-write transforms, we'd like to enrich each document based on queries against databases (that are largely key-value stores).

  • A simple example would be a gazetteer: decompose the text into ngrams, and then check if those ngrams reside in some database, and record (within a transformed version of the original doc) the entity identifier given phrases map to.

How to do this efficiently?

NAIVE (although possibly tricky with the serialization requirement?):

Each document could simply query the database individually (similar to Querying a relational database through Google DataFlow Transformer), but, given that most of these are simple key-value stores, it seems like there should be a more efficient way to do this (given the real problems with database query latency).

SCENARIO #1: Improved?:

Current strawman is to store the tables in Bigquery, pull them down (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py), and then use them as side inputs, that are used as key-value lookups within the per-doc function(s).

Key-value tables range from generally very small to not-huge (100s of MBs, maybe low GBs). Multiple CoGroupByKey with same key apache beam ("Side inputs can be arbitrarily large - there is no limit; we have seen pipelines successfully run using side inputs of 1+TB in size") suggests this is reasonable, at least from a size POV.

1) Does this make sense? Is this the "correct" design pattern for this scenario?

2) If this is a good design pattern...how do I actually implement this?

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L53 shows feeding the result to the document function as an AsList.

i) Presumably, AsDict is more appropriate here, for the above use case? So I'd probably need to run some transformations first on the Bigquery output to separate it into key, value tuple; and make sure that the keys are unique; and then use it as a side input.

ii) Then I need to use the side input in the function.

What I'm not clear on:

  • for both of these, how to manipulate the output coming off of the Bigquery pull is murky to me. How would I accomplish (i) (assuming it is necessary)? Meaning, what does the data format look like (raw bytes? strings? is there a good example I can look into?)

  • Similarly, if AsDict is the correct way to pass it into the func, can I just reference things like a dict normally is used in python? e.g., side_input.get('blah') ?

SCENARIO #2: Even more improved? (for specific cases):

  • The above scenario--if achievable--definitely does seem like it is superior continuous remote calls (given the simple key-value lookup), and would be very helpful for some of our scenarios. But if I take a scenario like a gazetteer lookup (like above)...is there an even more optimized solution?

Something like, for every doc, writing our all the ngrams as keys, with values as the underlying indices (docid+indices within the doc), and then doing some sort of join between these ngrams and the phrases in our gazeteer...and then doing another set of transforms to recover the original docs (now w/ their new annotations).

I.e., let Beam handle all of the joins/lookups directly?

Theoretical advantage is that Beam may be a lot quicker in doing this than, for each doc, looping over all of the ngrams and doing a check if the ngram is in the side_input.

Other key issues:

3) If this is a good way to do things, is there any trick to making this work well in the streaming scenario? Text elsewhere suggests that the side input caching works more poorly outside the batch scenario. Right now, we're focused on batch, but streaming will become relevant in serving live predictions.

4) Any Beam-related reason to prefer Java>Python for any of the above? We've got a good amount of existing Python code to move to Dataflow, so would heavily prefer Python...but not sure if there are any hidden issues with Python in the above (e.g., I've noticed Python doesn't support certain features or I/O).

EDIT: Strawman? for the example ngram lookup scenario (should generalize strongly to general K:V lookup)

  • Phrases = get from bigquery
  • Docs (indexed by docid) (direct input from text or protobufs, e.g.)
  • Transform: phrases -> (phrase, entity) tuples
  • Transform: docs -> ngrams (phrase, docid, coordinates [in document])
  • CoGroupByKey key=phrase: (phrase, entity, docid, coords)
  • CoGroupByKey key=docid, group((phrase, entity, docid, coords), Docs)
  • Then we can iteratively finalize each doc, using the set of (phrase, entity, docid, coords) and each Doc
1
Working on an answer to address your concerns now...Pablo
Thanks Pablo. EDIT: added strawman to main question body (end of text) to improve formatting. Feels a bit roundabout?severian

1 Answers

7
votes

Regarding the scenarios for your pipeline:

  1. Naive scenario

You are right that per-element querying of a database is undesirable.

If your key-value store is able to support low-latency lookups by reusing an open connection, you can define a global connection that is initialized once per worker instead of once per bundle. This should be acceptable your k-v store supports efficient lookups over existing connections.

  1. Improved scenario

If that's not feasible, then BQ is a great way to keep and pull in your data.

You can definitely use AsDict side inputs, and simply go side_input[my_key] or side_input.get(my_key).

Your pipeline could look something like so:

kv_query = "SELECT key, value FROM my:table.name"
p = beam.Pipeline()
documents_pcoll = p | ReadDocuments()
additional_data_pcoll = (p 
                   | beam.io.BigQuerySource(query=kv_query)
                   # Make row a key-value tuple.
                   | 'format bq' >> beam.Map(lambda row: (row['key'], row['value'])))

enriched_docs = (documents_pcoll 
                 | 'join' >> beam.Map(lambda doc, query: enrich_doc(doc, query[doc['key']]), 
                                      query=AsDict(additional_data_pcoll)))

Unfortunately, this has one shortcoming, and that's the fact that Python does not currently support arbitrarily large side inputs (it currently loads all of the K-V into a single Python dictionary). If your side-input data is large, then you'll want to avoid this option.

Note This will change in the future, but we can't be sure ATM.

  1. Further Improved

Another way of joining two datasets is to use CoGroupByKey. The loading of documents, and of K-V additional data should not change, but when joining, you'd do something like so:

# Turn the documents into key-value tuples as well[
documents_kv_pcoll = (documents_pcoll 
                      | 'format docs' >> beam.Map(lambda doc: (doc['key'], doc)))
enriched_docs = ({'docs': documents_kv_pcoll, 'additional_data': additional_data_pcoll}
                 | beam.CoGroupByKey()
                 | 'enrich' >> beam.Map(lambda x: enrich_doc(x['docs'][0], x['additional_data'][0]))

CoGroupByKey will allow you to use arbitrarily large collections on either side.

Answering your questions

  1. You can see an example of using BigQuery as a side input in the cookbook. As you can see there, the data comes parsed (I believe that it comes in their original data types, but it may come in string/unicode). Check the docs (or feel free to ask) if you need to know more.

  2. Currently, Python streaming is in alpha, and it does not support side inputs; but it does support shuffle features such as CoGroupByKey. Your pipeline using CoGroupByKey should work well in streaming.

  3. A reason to prefer Java over Python is that all these features work in Java (unlimited-size side inputs, streaming side inputs). But it seems that for your use case, Python may have all you need.

Note: The code snippets are approximate, but you should be able to debug them using the DirectRunner.

Feel free to ask for clarification, or to ask about other aspects if you feel like it'd help.