1
votes

I'm trying to build an ETL to load a Dimension table. I'm ussign Apache Bea, with Python and DataFlow, and BigQuery.

I need to assign a sequence number to each element of a pcollection in order to load its into BigQuery, but I cant find any way to do this.

I think I need DataFlow to make the previous aggregation and joins to get my final pcollection to add the sequence number, but in this moment I need to stop parallel processing and cast my pcollection to a list (as in Spark when you use .collect()) and then make an easy loop to assign the sequence number. is it right?

This is the pipeline I've coded:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

I've read there is no way to get a list from pcollection: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

How can I achieve it? Any help?

1
Can you post what you have tried so far, and code?Graham Polley
This is my first approach using Beam. I;m going to add my piece of code, but I dont find any way.Manuel Valero
Can you elaborate on why you think you need to add the sequence number? What it is that you plan then to do in BigQuery that will require this sequence number?Mikhail Berlyant
I need it to identify a pk in the dimensional tableManuel Valero
It sounds then like you need just a unique key, not necessarily a sequence number? Can you use a randomly generated GUID?jkff

1 Answers

3
votes

If what you want is to get a list with each of the elements in a PCollection, you can use a side input. Keep in mind that this will remove all parallelism from your results, and your pipeline may become slow.

If you still want to do this, then:

side_input_coll = beam.pvalue.AsIterable(my_collection)

(p 
 | beam.Create([0]) 
 | beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
               my_seq=side_input_coll))

But don't forget that to preserve parallelism, it may be best to simply generate a random ID. Remember that PCollections are intrinsically unordered.

To learn more about side inputs, see the Beam Programming Guide on Side Inputs