2
votes

I am trying to read a JSON file (multi-line) in the pipeline but beam.io.ReadFromText(somefile.json reads one line at a time.

I am trying to read the content of the file as JSON so that I can apply map on each category to download relevant products file.

This is how my JSON file (productindex.json) looks like:

{
  "productcategories" : {
    "category1" : {
      "productfile" : "http://products.somestore.com/category1/products.json"
    },
    "category2" : {
      "productfile" : "http://products.somestore.com/category2/products.json"
    },
    "category3" : {
      "productfile" : "http://products.somestore.com/category3/products.json"
    },
    "category4" : {
      "productfile" : "http://products.somestore.com/category4/products.json"
    }
}

This is how the beginning of my pipeline looks like:

with beam.Pipeline(options=pipeline_options) as p:
    rows = (
        p | beam.io.ReadFromText(
            "http://products.somestore.com/allproducts/productindex.json")
    )

I am using apache-beam[gcp] module.

How do I achieve this?

1
I'm currently using the Java SDK of Apache Beam, but had the same issue. I solved my issue with jsonString.replaceAll("\\R", " "). That regex will detect newline and return characters. This replacement will flatten your json into a single line. In Python it would be something like jsonString.replace("\n\r", " ").rocksNwaves

1 Answers

2
votes

Apache Beam / Cloud Dataflow does not directly support reading multi-line Json data.

The primary reason is that this is very hard to do in parallel. How does Beam know where each record ends? This is easy for a single reader, but very complicated for parallel readers.

The best solution that I can recommend is to convert your Json data into Newline-delimited Json (NDJSON) before processing by Beam / Dataflow. This may be as simple as changing the output format written by the upstream task or may require pre-processing.