3
votes

I used a slightly modified version of the wordcount example (https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py), replacing the process function with the following:

  def process(self, element):
    """Returns an iterator over the words of this element.
    The element is a line of text.  If the line is blank, note that, too.
    Args:
      element: the element being processed
    Returns:
      The processed element.
    """
    import random
    import time
    n = random.randint(0, 1000)
    time.sleep(5)
    logging.getLogger().warning('PARALLEL START? ' + str(n))
    time.sleep(5)

    text_line = element.strip()
    if not text_line:
      self.empty_line_counter.inc(1)
    words = re.findall(r'[\w\']+', text_line, re.UNICODE)
    for w in words:
      self.words_counter.inc()
      self.word_lengths_counter.inc(len(w))
      self.word_lengths_dist.update(len(w))

    time.sleep(5)
    logging.getLogger().warning('PARALLEL END? ' + str(n))
    time.sleep(5)

    return words

The idea is to check that the step is being performed in parallel. The expected output would be, for instance:

PARALLEL START? 447
PARALLEL START? 994
PARALLEL END? 447
PARALLEL START? 351
PARALLEL START? 723
PARALLEL END? 994
PARALLEL END? 351
PARALLEL END? 723

However, the actual result is something like this, which indicates that the step is not running in parallel:

PARALLEL START? 447
PARALLEL END? 447
PARALLEL START? 994
PARALLEL END? 994
PARALLEL START? 351
PARALLEL END? 351
PARALLEL START? 723
PARALLEL END? 723

I've tried using the LocalRunner with direct_num_workers manually set, as well as using DataflowRunner with multiple workers, to no avail. What can be done to ensure that this step is actually run in parallel?

Update: the multi-processing mode found here looks promising. However, on Windows command line (python wordcount.py --region us-east1 --setup_file setup.py --input_file gs://dataflow-samples/shakespeare/kinglear.txt --output output/), I receive the following error when using it:

Exception in thread run_worker:
Traceback (most recent call last):
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 926, in _bootstrap_inner
        self.run()
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\threading.py", line 870, in run
        self._target(*self._args, **self._kwargs)
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\site-packages\apache_beam\runners\portability\local_job_service.py", line 218, in run
        p = subprocess.Popen(self._worker_command_line, shell=True, env=env_dict)
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 775, in __init__
        restore_signals, start_new_session)
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 1119, in _execute_child
        args = list2cmdline(args)
    File "C:\Users\User\AppData\Local\Programs\Python\Python37\lib\subprocess.py", line 530, in list2cmdline
        needquote = (" " in arg) or ("\t" in arg) or not arg
TypeError: argument of type 'int' is not iterable
1
What input data are you using? - Pablo
I kept the default input for wordcount: storage.googleapis.com/dataflow-samples/shakespeare/… ('gs://dataflow-samples/shakespeare/kinglear.txt') - Nivaldo H
What is your Python Beam SDK version? - Nick_Kh
It was 2.19.0, and I've just updated it to 2.22.0. Thanks for the heads up. Unfortunately, the behavior is still the same. - Nivaldo H

1 Answers

1
votes

The standard Apache Beam example uses a very small data input: gs://dataflow-samples/shakespeare/kinglear.txt is only a few KBs, so it will not split the work well.

Apache Beam does work parallelization by splitting up the input data. For example, if you have many files, each file will be consumed in parallel. If you have a file that is very large, Beam is able to split that file into segments that will be consumed in parallel.

You are correct that your code should eventually show parallelism happening - but try with a (significantly) larger input.