0
votes

I've been trying to get my pipeline to run with a classic template on Dataflow.

The pipeline is supposed to read runtime parameters from_date and to_dateand pass those on to a REST API. The answer coming back from the API should then be written into a bigquery table.

It runs without any errors on Dataflow but my data does simply not appear in the gbq table that is the data sink. When I execute it locally, it works like a charm: no errors and I can write to gbq using a service account and local files.

I suspect that I misunderstand what is available to the pipeline steps in the different environments and that no data is actually passed along the pipeline.

The requests package might not be available on the Dataflow runner, but I would expect an error message...

When I tried to run it on dataflow but wrote to text (commented line below), a folder was created on cloud storage but no file appeared inside.

Also I suspect that this is why I cannot get any of my debug messages to show up in the monitoring UI.

Help much appreciated - here is my pipeline code:

#!/usr/bin/env python
# coding: utf-8

import logging
import argparse

# Beam/Dataflow related imports
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.value_provider import RuntimeValueProvider

# Handling of API calls
import requests
import json


class get_api_data(beam.DoFn):
    def __init__(self):
        logging.debug("fetching api data")

    def process(self, dates):

        bearer_token = "api_secret"

        from_date = str(dates[0])
        to_date = str(dates[1])

        logging.debug("Now fetching from ", from_date, " to ", to_date)

        payload = {'stuff': 'stuff',
                   'from': from_date,
                   'to': to_date,
                   'other_stuff': 'other_stuff'
                   }

        payload = json.dumps(payload)

        headers = {
                  'Content-Type': 'application/json',
                  'Authorization': 'Bearer ' + bearer_token,
                  'Accept': 'application/json',
                  'Content-Type': 'application/json'
                  }

        r = requests.post("api_url", data= payload, headers=headers)

        return [line.decode("utf-8") for line in r.iter_lines()][1:]


class Split(beam.DoFn):
    def process(self, element):

        try:
            pid, date, another_kpi, yet_another_kpi = element.split(",")
            logging.debug(" | ".join(element.split(",")) )
        except ValueError:
            logging.error(" | ".join(element.split(",")) )

        return [{
            'pid':str(pid),
            'date':str(date),
            'another_kpi':int(another_kpi),
            'yet_another_kpi':float(yet_another_kpi)
        }]


class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        
        parser.add_value_provider_argument('--to_date', dest='to_date', type=str) 
        parser.add_value_provider_argument('--from_date', dest='from_date', type=str)


def run(argv=None):
  
    parser = argparse.ArgumentParser()
    path_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_options = PipelineOptions(pipeline_args)

    print("Google Cloud Options: ", pipeline_options.view_as(GoogleCloudOptions))

    from_date = pipeline_options.view_as(UserOptions).from_date
    to_date = pipeline_options.view_as(UserOptions).to_date

    logging.debug("Data from ", from_date, " to ", to_date)

    table_spec = bigquery.TableReference(
        projectId='my_project',
        datasetId='my_dataset',
        tableId='my_table')

    table_schema = 'pid:STRING, date:STRING, another_kpi:INT64, yet_another_kpi:FLOAT64'

    p1 = beam.Pipeline(options=pipeline_options)

    ingest_data = (
        p1
        | 'pass dates' >> beam.Create([[from_date, to_date]])
        | 'fetch API data' >> beam.ParDo(get_api_data()) 
        | 'split records' >> beam.ParDo(Split())
        | 'write into gbq' >> beam.io.gcp.bigquery.WriteToBigQuery(table = table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE ,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        #| 'write to text' >> beam.io.WriteToText("./test_v2.csv")
    )

    result = p1.run()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.DEBUG)
    run()
1
Also, if anyone knows how to throw an error and stop the pipeline if it runs dry, i.e. no more data is passed along that would help me debug.Manuel Huppertz
Do you tell Dataflow to install requests using requirements.txt? beam.apache.org/documentation/sdks/python-pipeline-dependenciesPeter Kim
As per cloud.google.com/dataflow/docs/concepts/…, requests is already installed, also, as I mentioned no errors regarding this module.Manuel Huppertz
After testing with your code, I narrowed down the issue to be in write disposition, as I see rows when I use WRITE_APPEND. Which SDK version are you using? Is this in your SDK? github.com/apache/beam/pull/10668/…Peter Kim
Actually found the solution a minute ago here.Manuel Huppertz

1 Answers

2
votes

Using the ValueProvider in combination with Create is apparently forbidden, although I did not get an error message.

I solved it by using:

class OutputValueProviderFn(beam.DoFn):
    def __init__(self, vp1, vp2):
       self.vp1 = vp1
       self.vp2 = vp2

    def process(self, unused_elm):
        logging.info("Providing dates: ", self.vp1.get(), self.vp2.get() )
        yield [self.vp1.get(), self.vp2.get()]
...

from_date = pipeline_options.view_as(UserOptions).from_date
to_date = pipeline_options.view_as(UserOptions).to_date

pipel = (
        p1
        | 'Start Pipeline' >> beam.Create([None])
        | 'Read from and to date' >> beam.ParDo(OutputValueProviderFn(from_date, to_date))
        | 'fetch API data' >> beam.ParDo(get_api_data())
        ...
    )

Inspiration here