0
votes

I am trying my hands on the Akka Stream DSL. The thing that I can not get to compile is strong typed Flows. For Zip I do the following and it compiles fine:

val fanIn = builder.add(Zip[LoginResponse, LoginCommand.UserData])

For Flow however I cannot seem to get it working as desired since the following does not compile:

val encryptLoginData = builder.add(Flow[(LoginResponse, LoginCommand.UserData), LoginCommand.UserData])

From the error message I cannot really tell, what I am doing wrong.

I noticed that apply is only taking a single type, but how is this supposed to show mapping between in <-> out?

Any hints on what I am doing wrong, or how this is supposed to be declared?

2

2 Answers

1
votes

Define your flow in the following manner:

val flow =                 
  Flow[(LoginResponse, LoginCommand.UserData)]
    .map { (response, user) =>
      // do something with response and/or user that returns a LoginCommand.UserData
    } // Flow[(LoginResponse, LoginCommand.UserData), LoginCommand.UserData, _]

fanIn.out ~> flow

The Flow.apply[T] method, as the Scaladoc indicates, "returns a Flow which outputs all its inputs." This is the most basic Flow and serves as a building block for more complex Flows. Typically, one needs to do more than merely output all the input elements; the common use case is to define a Flow that transforms the input in some way and outputs the result of that transformation. Your attempt to add the following Flow...

Flow[(LoginResponse, LoginCommand.UserData), LoginCommand.UserData]

...suggests that you want to take an input of type (LoginResponse, LoginCommand.UserData) and output a LoggingCommand.UserData. However, in order to do so, you have to define as part of the Flow how you want to transform an input element of type (LoginResponse, LoginCommand.UserData) to a LoginCommand.UserData.

0
votes

Due to the answer by @JeffreyChung I am reconsidering whether Flow is the best use here. I now tried to go with the more basic FlowShape, which at least compiles.

    val encryptIn = Inlet[(LoginResponse, LoginCommand.UserData)]("EncryptInput")
    val encryptOut = Outlet[LoginCommand.UserData]("EncryptOutput")
    val encryptLoginData = FlowShape(encryptIn, encryptOut)

Not as elegant as I want it to be but neither is any solution I have seen using Flow.

So is this the intended way of using the DSL?