7
votes

I am having understanding how we are supposed to test our pipeline using Google DataFlow(based on Apache Beam) Python SDK.

https://beam.apache.org/documentation/pipelines/test-your-pipeline/ https://cloud.google.com/dataflow/pipelines/creating-a-pipeline-beam

The above link is ONLY for Java. I am pretty confused as to why Google will point to Java Apache testing.

I want to be able to view the results of a CoGroupByKey join on two p collections. I am coming from a Python background, and I have little to no experience using Beam/Dataflow.

Could really use any help. I know this is open ended to an extent.. basically I need to be able to view results within my pipeline and it's preventing me from seeing the results of my CoGroupByKey Join.

Code Below

    #dwsku, product are PCollections coming from BigQuery. Nested Values as 
    #well in Product, but not dwsku
    d1 = {'dwsku': dwsku, 'product': product}
    results = d1 | beam.CoGroupByKey()
    print results

What is printed:

    PCollection[CoGroupByKey/Map(_merge_tagged_vals_under_key).None]
1
I don't fully get the question, but maybe this helps? beam.apache.org/get-started/quickstart-pyGraham Polley

1 Answers

3
votes

If you want to test it locally on your machine, you should start with using DirectRunner and then you will be able to debug it - either by printing logs or by stopping the execution in debugger.

In order to see whole PCollection locally you can do the following:

d1 = {'dwsku': dwsku, 'product': product}
results = d1 | beam.CoGroupByKey()

def my_debug_function(pcollection_as_list):
    # add a breakpoint in this function or just print
    print pcollection_as_list

debug = (results | beam.combiners.ToList() | beam.Map(my_debug_function))

There are a few things to remember in here:

  • ToList() transform can potentially allocate a lot of memory
  • while using DirectRunner you should use .wait_until_finish() method of your pipeline, so that you script will not end before the pipeline finishes executing
  • if your pipeline downloads data from BigQuery, you should put LIMIT in the query when running locally