0
votes

I am building Dataflow job to get data from cloud storage and pass it to NLP API to perform sentiment analysis and import the result to BigQuery

The Job ran successfully localy (I didn't use data flow runner)

import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types


PROJECT = 'ProjectName'
schema = 'name : STRING, id : STRING, date : STRING,title : STRING, text: STRING,magnitude : STRING, score : STRING'
src_path = "gs://amazoncustreview/sentimentSource.csv"


class Sentiment(beam.DoFn):
    def process(self, element):
        element = element.split(",")
        client = language.LanguageServiceClient()
        document = types.Document(content=element[2],
                                  type=enums.Document.Type.PLAIN_TEXT)
        sentiment = client.analyze_sentiment(document).document_sentiment
        return [{
            'name': element[0],
            'title': element[1],
            'magnitude': sentiment.magnitude,
            'score': sentiment.score
        }]


def main():
    BUCKET = 'BucKet name'
    argv = [
      '--project={0}'.format(PROJECT),
      '--staging_location=gs://{0}/staging/'.format(BUCKET),
      '--temp_location=gs://{0}/staging/'.format(BUCKET),
      '--runner=DataflowRunner',
      '--job_name=examplejob2',
      '--save_main_session'
    ]
    p = beam.Pipeline(argv=argv)

    (p
       | 'ReadData' >> beam.io.textio.ReadFromText(src_path)
       | 'ParseCSV' >> beam.ParDo(Sentiment())
       | 'WriteToBigQuery' >> 
    beam.io.WriteToBigQuery('{0}:Dataset.table'.format(PROJECT),
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
    )
    p.run()


if __name__ == '__main__':
    main()

this the error I get I have tried to import different version of Google Cloud Language but all my trial failed.

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 280, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
    module = unpickler.load()
  File "/usr/lib/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
    value = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
    return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named language_v1.gapic
1

1 Answers

0
votes

It seems to be a mismatch with the google-cloud-language version installed in the Dataflow workers. To solve it, create a requirements.txt file and add google-cloud-language==1.3.0 for example.

Then, add '--requirements_file=requirements.txt' to the options arguments of your pipeline.

I tested it with this code and it worked for me:

enter image description here