1
votes

I have written the below code to write the streaming data from pub sub and write it into google cloud storage.

def run():
argv = [
  '--project={0}'.format(PROJECT),
  '--job_name=mypubsubsample40',
  # '--save_main_session',
   '--staging_location=gs://abc/staging/',
   '--temp_location=gs://abc/staging/',
  '--runner=DataflowRunner',
  '--streaming'
]


p = beam.Pipeline(argv=argv)
lines = p | 'read_stream' >> 
beam.io.ReadStringsFromPubSub(subscription='projects/myprojectid 
209306/subscriptions/mysub1',id_label="MESSAGE_ID") | 'write to file' >> 
beam.io.WriteToText('gs://bagdfs2/myout')`

When i execute the same program as 'DirectRunner' then the files are created in GCS but when i execute the program using 'DataFlowRunner' it is not working.

Moreover , when the pipeline is running in cloud dataflow , after a minute or so , im getting the below error in logs :

    java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
    factory, transform_id, transform_proto, consumers, serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
    c = base64.b64decode(encoded)
  File "/usr/lib/python2.7/base64.py", line 78, in b64decode
    raise TypeError(msg)
TypeError: Incorrect padding

        java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
        org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:55)
        com.google.cloud.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:274)
        com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:101)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1775: Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 127, in _execute
    response = task()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 162, in <lambda>
    self._execute(lambda: worker.do_instruction(work), work)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
    request.instruction_id)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 227, in process_bundle
    self.data_channel_factory)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in __init__
    self.ops = self.create_execution_tree(self.process_bundle_descriptor)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 269, in create_execution_tree
    descriptor.transforms, key=topological_height, reverse=True)])
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 252, in get_operation
    in descriptor.transforms[transform_id].outputs.items()
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 251, in <dictcomp>
    for tag, pcoll_id
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 204, in wrapper
    result = cache[args] = func(*args)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 255, in get_operation
    transform_id, transform_consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 359, in create_operation
    return creator(self, transform_id, transform_proto, payload, consumers)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 481, in create
    factory, transform_id, transform_proto, consumers, serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 529, in _create_pardo_operation
    dofn_data = pickler.loads(serialized_fn)
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/internal/pickler.py", line 222, in loads
    c = base64.b64decode(encoded)
  File "/usr/lib/python2.7/base64.py", line 78, in b64decode
    raise TypeError(msg)
TypeError: Incorrect padding

I didn't even ingest any data in pubsub but the above log appears as soon as i start running my pipeline in cloud dataflow.

Please let me know if im doing anything wrong here.

1

1 Answers

2
votes

Writing to GCS in streaming pipelines is not yet supported.