2
votes

My Apache Beam pipeline takes an infinite stream of messages. Each message fans out N elements (N is ~1000 and is different for each input). Then for each element produced by the previous stage there is a map operation that produces new N elements, which should be reduced using a top 1 operation (elements are grouped by the original message that was read from the queue). The results of top 1 is saved to an external storage. In Spark I can easily do it by reading messages from the stream and creating RDD for each message that does map + reduce. Since Apache Beam does not have nested pipelines, I can't see a way implementing it in Beam with an infinite stream input. Example:

Infinite stream elements: A, B

Step 1 (fan out, N = 3): A -> A1, A2, A3
                (N = 2): B -> B1, B2

Step 2 (map): A1, A2, A3 -> A1', A2', A3'
              B1, B2, B3 -> B1', B2'

Step 3 (top1): A1', A2', A3' -> A2'
               B1', B2' -> B3'

Output: A2', B2'

There is no dependency between A and B elements. A2' and B2' are top elements withing their group. The stream is infinite. The map operation can take from a couple of seconds to a couple of minutes. Creating a window watermark for the maximum time it takes to do the map operation would make the overall pipeline time much slower for fast map operations. Nested pipeline would help because this way I could create a pipeline per message.

2
You can accomplish this in beam, however, you would need to use windowing along with triggers. That being said, in your infinite stream, do you want to keep track of the top across all the data seen in the stream or that particular window (in your case when N = 3 and later when N = 6)? I could write a minimum example if you could explain a bit more on your desired grouping/mapping operations - Haris Nadeem
@HarisNadeem Clarified the question to say that the top should be calculated per group - one element for A and element for B. The top result should be emitted as soon as it's available. The total time it can take to get from A to A2' can be from 30 seconds to 10 minutes. - Nutel
I see, your use case makes more sense now since I was confused what a message was defined as. This is still doable in Apache beam but there are two ways to do it. Since each message is independent from each other, you simply need to do a mapreduce on the message. But since you want to write to storage you would need windowing otherwise you'll never close the file descriptor. So the question would be how to implement it with the least computing time with the least amount of wasting of resources. Correct? If so, I can scale a solution up for you. - Haris Nadeem
Another way (it's more of a hack) would be to use the storage api and manually write to file in your DoFn and you would then need to manually close the file descriptor at the end of the DoFn function. This way you don't need windowing and would just be running an infinite stream. - Haris Nadeem
@HarisNadeem Right, it's map reduce on each message, which makes it not a typical data stream. The overhead associated with windowing should be as close to zero as possible, meaning flushing the result every minute from a window would not be acceptable. I'm curious about both windowing and a "hacky" solution though. - Nutel

2 Answers

0
votes

It doesn't seem like you'd need a 'nested pipeline' for this. Let me show you what that looks like in the Beam Python SDK (it's similar for Java):

For example, try the dummy operation of appending a number, and an apostrophe to a string (e.g. "A"=>"A1'"), you'd do something like this:

def my_fn(value):
  def _inner(elm):
    return (elm, elm + str(value) + "'")  # A KV-pair
  return _inner

# my_stream has [A, B]
pcoll_1 = (my_stream
           | beam.Map(my_fn(1)))
pcoll_2 = (my_stream
           | beam.Map(my_fn(2)))
pcoll_3 = (my_stream
           | beam.Map(my_fn(3)))

def top_1(elms):
  ... # Some operation

result = ((pcoll_1, pcoll_2, pcoll_3)
          | beam.CoGroupByKey()
          | beam.Map(top_1))
0
votes

So here is the sort of working solution. I will most likely be editing it for any mistakes I may make in understanding the question. (P.s. the template code is in java). Assuming that input is your stream source

PCollection<Messages> msgs = input.apply(Window.<Messages>into(        
                                    FixedWindows.of(Duration.standardSeconds(1)) 
                                                .triggering(AfterWatermark.pastEndOfWindow()
                                         // fire the moment you see an element 
                                                   .withEarlyFirings(AfterPane.elementCountAtLeast(1))
                                         //optional since you have small window 
                                                   .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
                                                .withAllowedLateness(Duration.standardMinutes(60))
                                                .discardingFiredPanes()); 

This would allow you to read a stream of Messages which could either be a string or a HashMap or even a list. Observe that you are telling beam to fire a window for every element that it receives and you have set a maximum windowing of 1 second. You can change this if you want to fire every 10 messages and a window of a minute etc.

After that you would need to write 2 classes that extend DoFn primarily

PCollection<Element> top = msgs.apply(ParDo.of(new ExtractElements()))
                               .apply(ParDo.of(new TopElement()));

Where Element can be a String, an int, double, etc.

Finally, you would right each Element to storage with:

top.apply(ParDo.of(new ParsetoString()))
   .apply(TextIO.write().withWindowedWrites()
                        .withNumShards(1)
                        .to(filename));

Therefore, you would have roughly 1 file for every message which may be a lot. But sadly you can not append to file. Unless you do a windowing where you group all the elements into one list and write to that.

Of course, there is the hacky way to do it without windowing, which I will explain if this use case does not seem to work out with you (or if you are curious)

Let me know if I missed anything! :)