3
votes

I am trying to define a graph for Akka stream that contain parallel processing flow (I am using Akka.NET but this shouldn't matter). Imagine a data source of orders, each order consists of an order ID and a list of products (order items). The workflow is as follows:

  1. Receive and order
  2. Broadcast the order to two flows, flow A will deal with order items, channel B will deal with Order ID (some bookkeeping work)
  3. Flow A: Split collection of order items into individual elements, each one to be processed separately
  4. Flow A: For each order items that result from the split in the previous step call some external service which looks up extra information (price, availability etc.)
  5. Flow B: do some extra bookkeeping for the given Order ID
  6. Merge flows A and B
  7. Send to the sink merged data from the previous step which result in enriched order information

Steps 1 (Source.From), 2 (Broadcast), 4-5 (Map), 6 (Merge), 7 (Sink) looks OK. But how is collection split implemented in Akka or reactive streams terms? This is not broadcasting or flattening, a collection of N elements need to be split into N independent substreams that will later be merged back. How is this achieved?

1

1 Answers

5
votes

I recommend to do it in one flow. I know two flows looks cooler but trust me it's not worth it in terms of simplicity of design (I tried). You may write something like this

import akka.stream.scaladsl.{Flow, Sink, Source, SubFlow}

import scala.collection.immutable
import scala.concurrent.Future

case class Item()

case class Order(items: List[Item])

val flow = Flow[Order]
  .mapAsync(4) { order =>
    Future {
      // Enrich your order here
      order
    }
  }
  .mapConcat { order =>
    order.items.map(order -> _)
  }
  .mapAsync(4) { case (order, item) =>
    Future {
      // Enrich your item here
      order -> item
    }
  }
  .groupBy(2, tuple => tuple._1)
  .fold[Map[Order, List[Item]]](immutable.Map.empty) { case (map, (order, item)) => map.updated(order, map.getOrElse(order, Nil) :+ item) }
  .mapConcat { _.map { case (order, newItems) => order.copy(items = newItems)} }

but even this approach is bad. There are so many things can go wrong either with code above or your design. What will you do if enrichment of one of order's items fails? What if enrichment of order object fails ? What should happens to your stream(s) ?

If I were you I'd have Flow[Order] and process its children in mapAsync so at least it guarantees I don't have partially processed orders.