We recently migrated our infraestructure to GCP and we are keen to use DataProc(Spark) and DataFlow(Apache Beam) for our data pipelines. Dataproc is kind of straight forward to make it work, but running Dataflow is giving us headaches:
How can we run a Dataflow job from a JupyterNotebook(like an AI notebook)
The example is the following, I do have a huge dataset that I want to grou_by, then make a filter and some calculations, then it should write an object in a specific bucket(right now this code, I do not know how, is deleting the bucket, instead of doing something useful)
import datetime, os
def preprocess(in_test_mode):
import shutil, os, subprocess
job_name = 'hola'
if in_test_mode:
print('Launching local job ... hang on')
OUTPUT_DIR = './preproc'
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
os.makedirs(OUTPUT_DIR)
else:
print('Launching Dataflow job {} ... hang on'.format(job_name))
OUTPUT_DIR = 'gs://experimentos-con-humanos/'.format(BUCKET)
try:
subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
except:
pass
options = {
'staging_location': os.path.join(OUTPUT_DIR, 'temp'),
'temp_location': os.path.join(OUTPUT_DIR, 'temp'),
'job_name': job_name,
'project': PROJECT,
'region': REGION,
'teardown_policy': 'TEARDOWN_ALWAYS',
'no_save_main_session': True,
'max_num_workers': 6
}
opts = beam.pipeline.PipelineOptions(flags = [], **options)
if in_test_mode:
RUNNER = 'DataflowRunner'
else:
RUNNER = 'DataflowRunner'
p = beam.Pipeline(RUNNER, options = opts)
(p
| 'ReadTable' >> beam.io.Read(beam.io.BigQuerySource(table_spec))
| 'hashAsKey' >> beam.Map(lambda r: (r['afi_hash'], r))
| 'Transpose' >> beam.GroupByKey()
| 'Filtro menos de 12' >> beam.Filter(lambda r: len(r[1]) >= 12 )
| 'calculos' >> beam.Map(calculos)
#| 'Group and sum' >> beam.
#| 'Format results' >> beam.
| 'Write results' >> beam.Map(lambda r: print(r))
| '{}_out'.format(1) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(1))))
)
job = p.run()
if in_test_mode:
job.wait_until_finish()
print("Done!")
preprocess(in_test_mode = False)
1) It does not work, it does run though!
2) That code works if I change 'DataflowRunner'
into 'DirectRunner'
, it means it works locally
3) If I do not change that, the job won´t appear at Dataflow, instead it will delete the GCP bucket where it works
PD: I do have admin permissions for storage, dataflow and BigQuery PD2: The table does exist, and the Bucket I have cuadruple check that it has the exact name PD3: I would like to make it work from a Jupyter Notebook, but It is not necessary if anybody wonders
if
statement. Aren't you missing{}
when buildingOUTPUT_DIR
? – Guillem Xercavins