3
votes

I am using Apache Beam on Python and would like to ask what is the equivalent of Apache Beam Java Wait.on() on python SDK?

currently I am having problem with this code snippet below

    if len(output_pcoll) > 1:
        merged = (tuple(output_pcoll) |
                  'MergePCollections1' >> beam.Flatten())
    else:
        merged = output_pcoll[0]

    outlier_side_input = self.construct_outlier_side_input(merged)

    (merged |
     "RemoveOutlier" >>
     beam.ParDo(utils.Remove_Outliers(),
                beam.pvalue.AsDict(outlier_side_input)) |
     "WriteToCSV" >>
     beam.io.WriteToText('../../ML-DATA/{0}.{1}'.format(self.BUCKET,
                         self.OUTPUT), num_shards=1))

it seems Apache Beam does not wait until the code on self.construct_outlier_side_input finished executing and result in empty side input when executing "RemoveOutlier" in the next pipeline. In Java version you can use Wait.On() to wait for construct_outlier_side_input to finish executing, however I could not find the equivalent method in the Python SDK.

--Edit-- what i am trying to achieve is almost the same as in this link, https://rmannibucau.metawerx.net/post/apache-beam-initialization-destruction-task

1
Welcome to stackoverflow. I'm new to python too, what I want you to make sure of is that the issue is mainly about timing and not anything else.. What I understand is that for you to have multithreading in python you have to implement that, otherwise the code would execute synchronously; statement after the other. You should first test that if you wait x seconds before removing outlier that your code behaves as you expect it, you can use time.sleep(x), you may check this link as well: blog.miguelgrinberg.com/post/how-to-make-python-waitHasnaa Ibraheem
Can you explain your usecase ? Are you trying to remove outliers from the merged dataset?Jayadeep Jayaraman
HasnaaIbraheem thanks, i tried that before posting it here, but still no success. @jjayadeep, yes, i am trying to remove outliers from the merged dataset, but the outliers i am trying to remove is local outliers. So from the merged dataset i construct a dictionary consists of item name as key and a list of its value as the dictionary value, then i use that dictionary as side input when removing outliersruka
I assume this is in a streaming pipeline? If yes what is the window you are applying to the SideInput data?Reza Rokni
@RezaRokni no it's not, this is batch pipelineruka

1 Answers

1
votes

You can use additional outputs feature of Beam to do this.

A sample code snippet is as follows

results = (words | beam.ParDo(ProcessWords(), cutoff_length=2, marker='x')
           .with_outputs('above_cutoff_lengths', 'marked strings',
                         main='below_cutoff_strings'))
below = results.below_cutoff_strings
above = results.above_cutoff_lengths
marked = results['marked strings']  # indexing works as well

Once you run the above code snippet you get multiple PCollections such as below, above and marked. You can then use side inputs to further filter or join the results

Hope that helps.

Update

Based on the comments I would like to mention that Apache Beam has capabilities to do stateful processing with the help of ValueState and BagState. If the requirement is to read through a PCollection and then make decisions based on if a prior value is present or not then such requirements can be handled through BagState as shown below:-

def process(self,
              element,
              timestamp=beam.DoFn.TimestampParam,
              window=beam.DoFn.WindowParam,
              buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
              buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2),
              watermark_timer=beam.DoFn.TimerParam(WATERMARK_TIMER)):

    # Do you processing here
    key, value = element
    # Read all the data from buffer1
    all_values_in_buffer_1 = [x for x in buffer_1.read()]

    if StatefulDoFn._is_clear_buffer_1_required(all_values_in_buffer_1):
        # clear the buffer data if required conditions are met.
        buffer_1.clear()

    # add the value to buffer 2
    buffer_2.add(value)

    if StatefulDoFn._all_condition_met():
      # Clear the timer if certain condition met and you don't want to trigger
      # the callback method.
      watermark_timer.clear()

    yield element

  @on_timer(WATERMARK_TIMER)
  def on_expiry_1(self,
                  timestamp=beam.DoFn.TimestampParam,
                  window=beam.DoFn.WindowParam,
                  key=beam.DoFn.KeyParam,
                  buffer_1=beam.DoFn.StateParam(BUFFER_STATE_1),
                  buffer_2=beam.DoFn.StateParam(BUFFER_STATE_2)):
    # Window and key parameters are really useful especially for debugging issues.
    yield 'expired1'