0
votes

I have the following code snippet:

    case class SomeClass(param1:String,param2:String,param3:String)

    val someClassActorSource: Source[SomeClass, ActorRef] = Source
      .actorPublisher[SomeClass](Props[SomeClassActorPublisher])

    val someFlow: ActorRef = Flow[SomeClass]

        .mapAsync(3)(f=> getDocumentById(f))

        .map(f =>{
          val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test")
            .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a)
            )
          (request,request)

        }).via(connection)

        //Parsing Response
        .mapAsync(3){
          case (Success(HttpResponse(status, _, entity, _)),request)=>
            entity.dataBytes.runFold(ByteString(""))(_ ++ _)
        }
        .map(resp =>parse(resp.utf8String,?????????????) )
        .to(Sink.someSink{....})
        .runWith(someClassActorSource)

    def parse(resp:String,parseParam:String)=????

and somewhere in the code I'm sending message to Flow:

someflow ! SomeClass("a","b","c")
someflow ! SomeClass("a1","b1","c1")

My problem is that method parse should use param2 from original case class

So for first message it should be

  parse(response,"b")

and for second message it should be

  parse(response,"b1")

So the question is, how can I fetch a parameter from the method I submitted to the flow?

1

1 Answers

1
votes

Assuming your connection value is being instantiated via

val connection = Http().cachedHostConnectionPool(...)

You can use the fact that the Connection takes in a tuple and instead of simply passing the request twice in the tuple you can pass in the inputted SomeClass. This SomeClass instance will have to go through each of your Flow values to make it to the parsing stage.

Modifying your code a bit:

val getDocumentFlow = 
  Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map( d => d -> f))

Your question doesn't state the return type from getDocumentById so I'm just using Document:

val documentToRequest = 
  Flow[(Document, SomeClass)] map { case (document, someClass) =>
    val request = ...

    (request, someClass)
  }

val parseResponse = 
  Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){
    case (Success(HttpResponse(status, _, entity, _)), someClass) =>
      entity
        .dataBytes
        .runFold(ByteString(""))(_ ++ _)
        .map(e => e -> someClass)
  }

val parseEntity = Flow[(ByteString, SomeClass)] map { 
  case (entity, someClass) => parse(entity.utf8String, someClass)
}

These flows can then be used as described in the question:

val someFlow = 
  someClassActorSource
    .via(getDocumentFlow)
    .via(documentToRequest)
    .via(connection)
    .via(parseResponse)
    .via(parseEntity)
    .to(Sink.someSink{...})
    .run()