0
votes

I'm trying to write Google PubSub messages to Google Cloud Storage using Google Cloud Dataflow (Python SDK). The messages come into PubSub in json format and I have to define a schema in order to write them into parquet format in Google Cloud Storage.

As suggested from other users I start working on this task by particularly looking at this and this sources.
The first one is not exactly what I want to do because it applies changes to the json files (it merges them through a window, put the original json into a field "message" and adds a timestamp representing the time of publication).
The second one source (source code here) fits better to the use case. Specifically, a schema is automatically defined from data extracted from a table in BigQuery and then write the results back to Google Cloud Storage in parquet format.
Does anyone know if it is possible to do the same, more precisely to automatically define a schema using pyarrow by reading json messages from PubSub? If it is possible, how can I do it?

This is what I've done so far. If I try to run it some parquet files are generated (they contain the columns name I specified through pyarrow schema, but they have no values), and several errors are generated from the Dataflow console (see one example below). In addition, if only one json file arrives in PubSub (which should be converted to a parquet file with only one line), I don't understand why many parquet files are generated (more than 10 if I leave the job running for a couple of minutes).


    import argparse
    import logging
    import pyarrow
    
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    
    def run(input_topic, output_path, pipeline_args=None):
        # TODO - Dynamic parquet_schema definition
        # input_topic = known_args.input
        # parquet_schema = get_parquet_schema(input_topic)
    
        parquet_schema = pyarrow.schema(
            [('Attr1', pyarrow.string()), ('Attr2', pyarrow.string()),
             ('Attr3', pyarrow.string()), ('Attr4', pyarrow.string()),
             ('Attr5', pyarrow.string()), ('Attr6', pyarrow.string())
             ]
        )
    
        # instantiate a pipeline with all the pipeline option
        pipeline_options = PipelineOptions(pipeline_args, streaming=True)
    
        # processing and structure of pipeline
        with beam.Pipeline(options=pipeline_options) as pipeline:
            (
                pipeline
                | 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
                | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                    file_path_prefix=output_path,
                    schema=parquet_schema,
                    file_name_suffix='.parquet')
            )
    
    
    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
    
        parser = argparse.ArgumentParser()
        parser.add_argument('--input_topic',
                            help='input pubsub topic to read data.',)
        parser.add_argument('--output_path',
                            help='gcs output location for parquet files.',)
        known_args, pipeline_args = parser.parse_known_args()
    
        run(
            known_args.input_topic,
            known_args.output_path,
            pipeline_args,
        )

This is the error that is generated from dataflow:


    Error message from worker: 
    java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last): 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str 
    
    During handling of the above exception, another exception occurred: 
    
    Traceback (most recent call last): 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) 
    File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
    File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
    File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004'] 
    
    java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) 
    java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) 
    org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) 
    org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333) 
    org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85) 
    org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154) 
    org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748) 
    Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last): 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
    filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str 
    
    During handling of the above exception, another exception occurred: 
    
    Traceback (most recent call last): 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task() 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id)) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value) 
    File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output 
    File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive 
    File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process 
    File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented 
    File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback) 
    File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process 
    File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process 
    File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
    filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value) 
    File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n]) 
    TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004'] 
    
    org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177) 
    org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) 
    org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) 
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)

1
Can you share which errors are thrown by Dataflow?rmesteves
@rmesteves added in the post.Federico Barusco
Try changing your pipeline to pipeline | 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic) | 'decode': beam.Map(lambda x: x.decode('utf-8')) | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet( file_path_prefix=output_path, schema=parquet_schema, file_name_suffix='.parquet')rmesteves
This extra step I added should decode the PubSub message before saving it to parquetrmesteves
Thanks @rmesteves for the suggestion, but the problem remains: even if the generated event is only one (so only one json file coming from pubsub), it creates many parquet files and they are all empty (there is only the header).Federico Barusco

1 Answers

0
votes

Sorry this gave you such an ugly error message! This looks like exactly the kind of error we'll be able to catch early when more transforms have typing support (See https://beam.apache.org/blog/python-typing/ for more info).

The ParquetIO sink expects an input PCollection with dictionary elements, but the PubSub source outputs a PCollection with bytes elements. You'll need to add a transform that parses the payload bytes and converts it to a dictionary. Something like this:

(pipeline
  | 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
  | '*** Parse JSON -> dict ***' >> beam.Map(json.loads)
  | 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
                    file_path_prefix=output_path,
                    schema=parquet_schema,
                    file_name_suffix='.parquet')