I am building Dataflow job to get data from cloud storage and pass it to NLP API to perform sentiment analysis and import the result to BigQuery
The Job ran successfully localy (I didn't use data flow runner)
import apache_beam as beam
import logging
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
PROJECT = 'ProjectName'
schema = 'name : STRING, id : STRING, date : STRING,title : STRING, text: STRING,magnitude : STRING, score : STRING'
src_path = "gs://amazoncustreview/sentimentSource.csv"
class Sentiment(beam.DoFn):
def process(self, element):
element = element.split(",")
client = language.LanguageServiceClient()
document = types.Document(content=element[2],
type=enums.Document.Type.PLAIN_TEXT)
sentiment = client.analyze_sentiment(document).document_sentiment
return [{
'name': element[0],
'title': element[1],
'magnitude': sentiment.magnitude,
'score': sentiment.score
}]
def main():
BUCKET = 'BucKet name'
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/staging/'.format(BUCKET),
'--temp_location=gs://{0}/staging/'.format(BUCKET),
'--runner=DataflowRunner',
'--job_name=examplejob2',
'--save_main_session'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadData' >> beam.io.textio.ReadFromText(src_path)
| 'ParseCSV' >> beam.ParDo(Sentiment())
| 'WriteToBigQuery' >>
beam.io.WriteToBigQuery('{0}:Dataset.table'.format(PROJECT),
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
p.run()
if __name__ == '__main__':
main()
this the error I get I have tried to import different version of Google Cloud Language but all my trial failed.
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 773, in run
self._load_main_session(self.local_staging_directory)
File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 489, in _load_main_session
pickler.load_session(session_file)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 280, in load_session
return dill.load_session(file_path)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 410, in load_session
module = unpickler.load()
File "/usr/lib/python2.7/pickle.py", line 864, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1139, in load_reduce
value = func(*args)
File "/usr/local/lib/python2.7/dist-packages/dill/_dill.py", line 828, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
ImportError: No module named language_v1.gapic