I need to perform a very simple transformation on some data (extract a string from JSON), then write it to PubSub - I'm attempting to use a custom python Dataflow job to do so.
I've written a job which successfully writes back to Cloud Storage, but my attempts at even the simplest possible write to PubSub (no transformation) result in an error: JOB_MESSAGE_ERROR: Workflow failed. Causes: Expected custom source to have non-zero number of splits.
Has anyone successfully written to PubSub from GCS via Dataflow?
Can anyone shed some light on what is going wrong here?
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
help='Input file to process.')
parser.add_argument('--output',
dest='output',
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
output = lines #Obviously not necessary but this is where my simple extract goes
output | beam.io.WriteToPubSub(known_args.output) # This doesn't
Write
method in code, and the--streaming
flag, works exactly as expected. So I now suspect that it's related to this issue I since found: stackoverflow.com/questions/56277145/…, and is potentially that I'm trying to mangle together a bounded read and an unbounded write. I found a workaround by writing a Scala batch job, but still very curious as to what this is all about! – originalgriefster