I've been trying to get my pipeline to run with a classic template on Dataflow.
The pipeline is supposed to read runtime parameters from_date
and to_date
and pass those on to a REST API. The answer coming back from the API should then be written into a bigquery table.
It runs without any errors on Dataflow but my data does simply not appear in the gbq table that is the data sink. When I execute it locally, it works like a charm: no errors and I can write to gbq using a service account and local files.
I suspect that I misunderstand what is available to the pipeline steps in the different environments and that no data is actually passed along the pipeline.
The requests
package might not be available on the Dataflow runner, but I would expect an error message...
When I tried to run it on dataflow but wrote to text (commented line below), a folder was created on cloud storage but no file appeared inside.
Also I suspect that this is why I cannot get any of my debug messages to show up in the monitoring UI.
Help much appreciated - here is my pipeline code:
#!/usr/bin/env python
# coding: utf-8
import logging
import argparse
# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider
# Handling of API calls
import requests
import json
class get_api_data(beam.DoFn):
def __init__(self):
logging.debug("fetching api data")
def process(self, dates):
bearer_token = "api_secret"
from_date = str(dates[0])
to_date = str(dates[1])
logging.debug("Now fetching from ", from_date, " to ", to_date)
payload = {'stuff': 'stuff',
'from': from_date,
'to': to_date,
'other_stuff': 'other_stuff'
}
payload = json.dumps(payload)
headers = {
'Content-Type': 'application/json',
'Authorization': 'Bearer ' + bearer_token,
'Accept': 'application/json',
'Content-Type': 'application/json'
}
r = requests.post("api_url", data= payload, headers=headers)
return [line.decode("utf-8") for line in r.iter_lines()][1:]
class Split(beam.DoFn):
def process(self, element):
try:
pid, date, another_kpi, yet_another_kpi = element.split(",")
logging.debug(" | ".join(element.split(",")) )
except ValueError:
logging.error(" | ".join(element.split(",")) )
return [{
'pid':str(pid),
'date':str(date),
'another_kpi':int(another_kpi),
'yet_another_kpi':float(yet_another_kpi)
}]
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--to_date', dest='to_date', type=str)
parser.add_value_provider_argument('--from_date', dest='from_date', type=str)
def run(argv=None):
parser = argparse.ArgumentParser()
path_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))
from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date
logging.debug("Data from ", from_date, " to ", to_date)
table_spec = bigquery.TableReference(
projectId='my_project',
datasetId='my_dataset',
tableId='my_table')
table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'
p1 = beam.Pipeline(options=pipeline_options)
ingest_data = (
p1
| 'pass dates' >> beam.Create([[from_date, to_date]])
| 'fetch API data' >> beam.ParDo(get_api_data())
| 'split records' >> beam.ParDo(Split())
| 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
#| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
)
result = p1.run()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
run()
requests
using requirements.txt? beam.apache.org/documentation/sdks/python-pipeline-dependencies – Peter Kim