1
votes

This is a subsequent post of Akka Stream - Select Sink based on Element in Flow.

Assume I have multiple SQS queues I'd like to stream from. I'm using the AWS SQS Connector of Alpakka to create Source.

implicit val sqsClient: AmazonSQSAsync = ???
val queueUrls: List[String] = ???
val sources: List[Source[Message, NotUsed]] = queueUrls.map(url => SqsSource(url))

Now, I'd like to combine the sources to merge them. However, the Source.combine method doesn't support passing a list as parameter, but only support varargs.

def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], NotUsed])

Of course, I can finger type all sources parameters. But, the parameter will get pretty long if I have 10 source queues.

Is there a way to combine sources from a list of sources?

[Supplement]

As Ramon J Romero y Vigil pointed out, it's a better practice to keep stream "a thin veneer". In this particular case, however, I use single sqsClient for all the SqsSource initialization.

1

1 Answers

3
votes

You could use foldLeft to concatenate or merge the sources:

val sources: List[Source[Message, NotUsed]] = ???

val concatenated: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ ++ _)
// the same as sources.foldLeft(Source.empty[Message])(_ concat _)

val merged: Source[Message, NotUsed] = sources.foldLeft(Source.empty[Message])(_ merge _)

Alternatively, you could use Source.zipN with flatMapConcat:

val combined: Source[Message, NotUsed] = Source.zipN(sources).flatMapConcat(Source.apply)