I have the following code snippet:
case class SomeClass(param1:String,param2:String,param3:String)
val someClassActorSource: Source[SomeClass, ActorRef] = Source
.actorPublisher[SomeClass](Props[SomeClassActorPublisher])
val someFlow: ActorRef = Flow[SomeClass]
.mapAsync(3)(f=> getDocumentById(f))
.map(f =>{
val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
.withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
)
(request,request)
}).via(connection)
//Parsing Response
.mapAsync(3){
case (Success(HttpResponse(status, _, entity, _)),request)=>
entity.dataBytes.runFold(ByteString(""))(_ ++ _)
}
.map(resp =>parse(resp.utf8String,?????????????) )
.to(Sink.someSink{....})
.runWith(someClassActorSource)
def parse(resp:String,parseParam:String)=????
and somewhere in the code I'm sending message to Flow:
someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")
My problem is that method parse should use param2 from original case class
So for first message it should be
parse(response,"b")
and for second message it should be
parse(response,"b1")
So the question is, how can I fetch a parameter from the method I submitted to the flow?