I'm trying to write Google PubSub messages to Google Cloud Storage using Google Cloud Dataflow (Python SDK). The messages come into PubSub in json format and I have to define a schema in order to write them into parquet format in Google Cloud Storage.
As suggested from other users I start working on this task by particularly looking at this and this sources.
The first one is not exactly what I want to do because it applies changes to the json files (it merges them through a window, put the original json into a field "message" and adds a timestamp representing the time of publication).
The second one source (source code here) fits better to the use case. Specifically, a schema is automatically defined from data extracted from a table in BigQuery and then write the results back to Google Cloud Storage in parquet format.
Does anyone know if it is possible to do the same, more precisely to automatically define a schema using pyarrow by reading json messages from PubSub? If it is possible, how can I do it?
This is what I've done so far. If I try to run it some parquet files are generated (they contain the columns name I specified through pyarrow schema, but they have no values), and several errors are generated from the Dataflow console (see one example below). In addition, if only one json file arrives in PubSub (which should be converted to a parquet file with only one line), I don't understand why many parquet files are generated (more than 10 if I leave the job running for a couple of minutes).
import argparse
import logging
import pyarrow
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run(input_topic, output_path, pipeline_args=None):
# TODO - Dynamic parquet_schema definition
# input_topic = known_args.input
# parquet_schema = get_parquet_schema(input_topic)
parquet_schema = pyarrow.schema(
[('Attr1', pyarrow.string()), ('Attr2', pyarrow.string()),
('Attr3', pyarrow.string()), ('Attr4', pyarrow.string()),
('Attr5', pyarrow.string()), ('Attr6', pyarrow.string())
]
)
# instantiate a pipeline with all the pipeline option
pipeline_options = PipelineOptions(pipeline_args, streaming=True)
# processing and structure of pipeline
with beam.Pipeline(options=pipeline_options) as pipeline:
(
pipeline
| 'Input: Read PubSub Messages' >> beam.io.ReadFromPubSub(topic=input_topic)
| 'Output: Export to Parquet' >> beam.io.parquetio.WriteToParquet(
file_path_prefix=output_path,
schema=parquet_schema,
file_name_suffix='.parquet')
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--input_topic',
help='input pubsub topic to read data.',)
parser.add_argument('--output_path',
help='gcs output location for parquet files.',)
known_args, pipeline_args = parser.parse_known_args()
run(
known_args.input_topic,
known_args.output_path,
pipeline_args,
)
This is the error that is generated from dataflow:
Error message from worker:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:333)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:123)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1369)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:154)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1088)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1018: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 245, in _execute response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 302, in <lambda> lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 471, in do_instruction getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 506, in process_bundle bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 972, in process_bundle element.data)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 670, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 671, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 963, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1045, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 961, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 726, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 814, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/iobase.py", line 1061, in process self.writer.write(element)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/
filebasedsink.py", line 420, in write self.sink.write_record(self.temp_handle, value)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/parquetio.py", line 534, in write_record self._buffer[i].append(value[n])
TypeError: byte indices must be integers or slices, not str [while running 'generatedPtransform-1004']
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) java.lang.Thread.run(Thread.java:748)