I have written the below code to write the streaming data from pub sub and write it into google cloud storage.
def run():
argv = [
'--project={0}'.format(PROJECT),
'--job_name=mypubsubsample40',
# '--save_main_session',
'--staging_location=gs://abc/staging/',
'--temp_location=gs://abc/staging/',
'--runner=DataflowRunner',
'--streaming'
]
p = beam.Pipeline(argv=argv)
lines = p | 'read_stream' >>
beam.io.ReadStringsFromPubSub(subscription='projects/myprojectid
209306/subscriptions/mysub1',id_label="MESSAGE_ID") | 'write to file' >>
beam.io.WriteToText('gs://bagdfs2/myout')`
When i execute the same program as 'DirectRunner' then the files are created in GCS but when i execute the program using 'DataFlowRunner' it is not working.
Moreover , when the pipeline is running in cloud dataflow , after a minute or so , im getting the below error in logs :
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
self.data_channel_factory)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
for tag, pcoll_id
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
factory, transform_id, transform_proto, consumers, serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
c = base64.b64decode(encoded)
File "/usr/lib/python2.7/base64.py", line 78, in b64decode
raise TypeError(msg)
TypeError: Incorrect padding
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:55)
com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
response = task()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
self._execute(lambda: worker.do_instruction(work), work)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
self.data_channel_factory)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
for tag, pcoll_id
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
factory, transform_id, transform_proto, consumers, serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
c = base64.b64decode(encoded)
File "/usr/lib/python2.7/base64.py", line 78, in b64decode
raise TypeError(msg)
TypeError: Incorrect padding
I didn't even ingest any data in pubsub but the above log appears as soon as i start running my pipeline in cloud dataflow.
Please let me know if im doing anything wrong here.