1
votes

I'm trying to learn my way through Cloud Dataflow. For the purpose of learning, I've broken down their basic Word Count example to a simple strip function. I want to create a PCollection of file names that are GCS objects. I get the message saying that the function ReadFromText() is not iterable.

The way I understand PCollections is that it is a list of objects that are to be worked upon. I could write a loop that throws in each object one by one to be processed but that's not what I want to do. I want to keep that part dynamic and let Apache Beam handle the rest. I only want to give a list of files in GCS.

So far, I have been successful at processing single element PCollections'. I also do not want to do something like 'gs://dataflow-samples/shakespeare/*'.

I've also looked at the gcsIO module and ReadAllFromText(). They also says that the function is not iterable. Please guide.

Here's what I've done until now:

"""A word-counting workflow."""

from __future__ import absolute_import

import argparse
import logging
import re

from past.builtins import unicode

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io.gcp import gcsio


class WordExtractingDoFn(beam.DoFn):
  """Parse each line of input text into words."""

  def __init__(self):
    super(WordExtractingDoFn, self).__init__()
  def process(self, element):  
    text_line = element.strip()
    return text_line


def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""

  p = beam.Pipeline(options=PipelineOptions())

  # Read the text file[pattern] into a PCollection.
  elements =                ['gs://dataflow-samples/shakespeare/1kinghenryiv.txt',
                            'gs://dataflow-samples/shakespeare/1kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/2kinghenryiv.txt',
                            'gs://dataflow-samples/shakespeare/2kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/3kinghenryvi.txt',
                            'gs://dataflow-samples/shakespeare/allswellthatendswell.txt',
                            'gs://dataflow-samples/shakespeare/antonyandcleopatra.txt',
                            'gs://dataflow-samples/shakespeare/asyoulikeit.txt',
                            'gs://dataflow-samples/shakespeare/comedyoferrors.txt',
                            'gs://dataflow-samples/shakespeare/coriolanus.txt',
                            'gs://dataflow-samples/shakespeare/cymbeline.txt',
                            'gs://dataflow-samples/shakespeare/hamlet.txt',
                            'gs://dataflow-samples/shakespeare/juliuscaesar.txt',
                            'gs://dataflow-samples/shakespeare/kinghenryv.txt',
                            'gs://dataflow-samples/shakespeare/kinghenryviii.txt',
                            'gs://dataflow-samples/shakespeare/kingjohn.txt',
                            'gs://dataflow-samples/shakespeare/kinglear.txt',
                            'gs://dataflow-samples/shakespeare/kingrichardii.txt',
                            'gs://dataflow-samples/shakespeare/kingrichardiii.txt',
                            'gs://dataflow-samples/shakespeare/loverscomplaint.txt',
                            'gs://dataflow-samples/shakespeare/loveslabourslost.txt',
                            'gs://dataflow-samples/shakespeare/macbeth.txt',
                            'gs://dataflow-samples/shakespeare/measureforemeasure.txt',
                            'gs://dataflow-samples/shakespeare/merchantofvenice.txt',
                            'gs://dataflow-samples/shakespeare/merrywivesofwindsor.txt',
                            'gs://dataflow-samples/shakespeare/midsummersnightsdream.txt',
                            'gs://dataflow-samples/shakespeare/muchadoaboutnothing.txt',
                            'gs://dataflow-samples/shakespeare/othello.txt',
                            'gs://dataflow-samples/shakespeare/periclesprinceoftyre.txt',
                            'gs://dataflow-samples/shakespeare/rapeoflucrece.txt',
                            'gs://dataflow-samples/shakespeare/romeoandjuliet.txt',
                            'gs://dataflow-samples/shakespeare/sonnets.txt',
                            'gs://dataflow-samples/shakespeare/tamingoftheshrew.txt',
                            'gs://dataflow-samples/shakespeare/tempest.txt',
                            'gs://dataflow-samples/shakespeare/timonofathens.txt',
                            'gs://dataflow-samples/shakespeare/titusandronicus.txt',
                            'gs://dataflow-samples/shakespeare/troilusandcressida.txt',
                            'gs://dataflow-samples/shakespeare/twelfthnight.txt',
                            'gs://dataflow-samples/shakespeare/twogentlemenofverona.txt',
                            'gs://dataflow-samples/shakespeare/various.txt',
                            'gs://dataflow-samples/shakespeare/venusandadonis.txt',
                            'gs://dataflow-samples/shakespeare/winterstale.txt']

  books = p | beam.Create((elements))
  #print (books)

  lines = p | 'read' >> ReadFromText(books)

  counts = (lines
            | 'split' >> (beam.ParDo(WordExtractingDoFn())
                          .with_output_types(unicode)))

  output = counts | 'write' >> WriteToText('gs://ihopeitworks/Users/see.txt',shard_name_template='')

  result = p.run()
  result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
1
My understanding is that a PCollection is not an iterable. Let us think what this means. While it is true that a PCollection is indeed a collection of items and thus one might intuitively think that at any time I can ask its size and iterate over it ... the reality is that a PCollection is a logical collection and doesn't actually contain any items at any given time. If you want to iterate over a PCollection, execute a ParDo which performs a function over each member of the collection that is eventually seen.Kolban
In other words, I would have to write a ParDo if I want to create something that takes in a bucket name and runs some functions over its objects? What should I do such that in the above code ReadFromText takes in the file names and not the strings as is? I thought I could write a lamdba function like here [github.com/GoogleCloudPlatform/training-data-analyst/blob/….Pranay Nanda
I'm not following the story and its likely to be my fault. Can you maybe describe the puzzle you face in conceptual terms as opposed to the concrete implementation? I get the sense that you want to create a PCollection from the content of an input list of file names?Kolban
Yes. That's right. And also do a ReadFromText() on that PCollection.Pranay Nanda

1 Answers

2
votes

You were pretty close. Try the below i.e. rather than passing the books as parameter of ReadFromText, use ReadAllFromText to read from the books PCollection by pipelining it. Hope that helps.

books = p | beam.Create((elements))
lines = books | 'read' >> ReadAllFromText()