1
votes

I have a GCS bucket from which I'm trying to read about 200k files and then write them to BigQuery. The problem is that I'm having trouble creating a PCollection that works well with the code. I'm following this tutorial for reference.

I have this code:

from __future__ import absolute_import

import argparse
import logging
import os

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
files = ['gs://mybucket/_chunk1',
         'gs://mybucket/_chunk0']


class DataIngestion:
    """A helper class which contains the logic to translate the file into
    a format BigQuery will accept."""

    def parse_method(self, string_input):

        x="""{}""".format(string_input)
        rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")
        d = {}
        d['name'], d['date'], d['geometry'], d['value0'], d['value1'], d['value2']=rx.split(x)
        d['geometry']=d['geometry'].strip('"')

        return d

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(files)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)
    | 'String To BigQuery Row' >> beam.Map(lambda s:
    data_ingestion.parse_method(s))
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.BigQuerySink(
    'mytable',
    dataset='mydataset',
    schema=myschema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The problem is that this code works perfect if files list has only one element. As soon as as there are more than 1 elements, the transform 'String To BigQuery Row' errors out and says error: nothing to repeat [while running 'String To BigQuery Row']. This probably is related to the regex module but I can't figure out what's wrong because it works perfectly when given 1 file.

Edit: Strangely it runs well via DirectRunner. I'm passing the requirements.txt file as given here.

This is how I'm executing the pipeline:

python streaming_inserts.py --runner=DataFlowRunner --project=my-project --temp_location=gs://temp/ --staging_location=gs://stage/  --requirements_file requirements.txt --disk_size_gb 1000 --region us-east1

My requirements.txt looks like this:

regex
google-cloud-storage

Also, as per logs, the package is being installed: enter image description here

1

1 Answers

6
votes

OP's comment made me realize my mistake: the intended library is regex, not python's builtin re.

Using import regex as re was not only confusing to me, but is also causing the re library to throw the nothing to repeat error. This is because Dataflow does not by default save your main session.

When the code in your parsing function is being executed, it doesn't have access to the context of re you imported at build time. Normally, this would fail with a NameError, but because you're using a valid library name, the code assumes you mean the builtin re library and tries to execute it as such.

If you use import regex instead, you'll see NameError: name 'regex' is not defined, which is the real reason the code is failing. To get around this, either move the import statement into the parsing function itself, or pass --save_main_session as an option to the runner. See here for more details.


Old answer:

While I can't tell what version of Python you're using, it seems that your suspicion about the regex is correct. * is a special character indicating repeats of what came before it, but ( is a special character that signifies a grouping, so a pattern like (*SKIP) doesn't seem grammatically correct.

In Python 3.7, the above expression doesn't even compile:

python -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 234, in compile
    return _compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/re.py", line 286, in _compile
    p = sre_compile.compile(pattern, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_compile.py", line 764, in compile
    p = sre_parse.parse(p, flags)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 930, in parse
    p = _parse_sub(source, pattern, flags & SRE_FLAG_VERBOSE, 0)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 816, in _parse
    p = _parse_sub(source, state, sub_verbose, nested + 1)
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 426, in _parse_sub
    not nested and not items))
  File "/home/ajp1/miniconda3/envs/b-building/lib/python3.7/sre_parse.py", line 651, in _parse
    source.tell() - here + len(this))
re.error: nothing to repeat at position 11

Python 2.7.15 doesn't accept it either:

python2 -c 'import re; rx = re.compile(r"""\{[^{}]+\}(*SKIP)(*FAIL)|,""")'
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python2.7/re.py", line 194, in compile
    return _compile(pattern, flags)
  File "/usr/lib/python2.7/re.py", line 251, in _compile
    raise error, v # invalid expression
sre_constants.error: nothing to repeat

While I don't know what strings you're trying to match, I suspect some of your characters need to be escaped. e.g. "\{[^{}]+\}(\*SKIP)(\*FAIL)|,"