0
votes

We recently migrated our infraestructure to GCP and we are keen to use DataProc(Spark) and DataFlow(Apache Beam) for our data pipelines. Dataproc is kind of straight forward to make it work, but running Dataflow is giving us headaches:

How can we run a Dataflow job from a JupyterNotebook(like an AI notebook)

The example is the following, I do have a huge dataset that I want to grou_by, then make a filter and some calculations, then it should write an object in a specific bucket(right now this code, I do not know how, is deleting the bucket, instead of doing something useful)

import datetime, os

def preprocess(in_test_mode):
    import shutil, os, subprocess
    job_name = 'hola'

    if in_test_mode:
        print('Launching local job ... hang on')
        OUTPUT_DIR = './preproc'
        shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
        os.makedirs(OUTPUT_DIR)
    else:
        print('Launching Dataflow job {} ... hang on'.format(job_name))
        OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
        try:
            subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
        except:
            pass

    options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
      'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
      'job_name': job_name,
      'project': PROJECT,
      'region': REGION,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True,
      'max_num_workers': 6
    }
    opts = beam.pipeline.PipelineOptions(flags = [], **options)

    if in_test_mode:
        RUNNER = 'DataflowRunner'
    else:
        RUNNER = 'DataflowRunner'

    p = beam.Pipeline(RUNNER, options = opts)
    (p 
         | 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))    
         | 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
         | 'Transpose' >> beam.GroupByKey()
         | 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )    
         | 'calculos' >> beam.Map(calculos)
            #| 'Group and sum' >> beam.
            #| 'Format results' >> beam.
         | 'Write results' >> beam.Map(lambda r: print(r))
         | '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
        )

    job = p.run()
    if in_test_mode:
        job.wait_until_finish()
        print("Done!")

preprocess(in_test_mode = False)

1) It does not work, it does run though! 2) That code works if I change 'DataflowRunner' into 'DirectRunner', it means it works locally 3) If I do not change that, the job won´t appear at Dataflow, instead it will delete the GCP bucket where it works

PD: I do have admin permissions for storage, dataflow and BigQuery PD2: The table does exist, and the Bucket I have cuadruple check that it has the exact name PD3: I would like to make it work from a Jupyter Notebook, but It is not necessary if anybody wonders

1
Hi there, are you able to run a Dataflow job outside of the notebook environment? Can you also please double check that you followed directions on the Python Quickstart guide at cloud.google.com/dataflow/docs/quickstarts/quickstart-python ? I want to make sure that you have the proper APIs enabled and Service Account Keys correctly set up.Cubez
@Cubez tks for the response, We have been able to do it using .py. We already have enabled the APIS, gave the ai notebook network user permission. About the service key: I supposed if it is an AI notebook theoretically should not need it. When we run outside the notebook it says ``` Workflow failed. Causes: Network default is not accessible to Dataflow Service account or does not.. ```DonCharlie
It looks like you are actually deleting everything inside the first if statement. Aren't you missing {} when building OUTPUT_DIR?Guillem Xercavins
Yes, got rid of the if statements, and made it more for Jupyter like work. Now, my problems seem more related with this: stackoverflow.com/questions/51362560/…. If you prefer write it as an answer, and I can give you the green tick!DonCharlie
Thanks for confirming, adding answer nowGuillem Xercavins

1 Answers

2
votes

As said in the comments, the issue seems to be in the pre-processing part. In particular, this part which is executed different when working locally or using the DataflowRunner:

if in_test_mode:
    print('Launching local job ... hang on')
    OUTPUT_DIR = './preproc'
    shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
    os.makedirs(OUTPUT_DIR)
else:
    print('Launching Dataflow job {} ... hang on'.format(job_name))
    OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
    try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
    except:
        pass

seems to be the responsible for deleting the bucket contents (which is used for output, temp files, etc). Also note that in the example you're not actually adding BUCKET to OUTPUT_DIR.