I am using Scala 2.11 and Akka Streams Kafka 0.17.
I have a stream where:
- A
Source
is created usingSource.actorRef
. Here, the actor is scheduled to run at some regular interval and generate messages continuously, which are emitted to the stream. - I have attached a
Producer
as aFlow
. The producer pushesProducerMessage.Message
to a Kafka topic. - Some DB operations.
I have a problem while constructing the ProducerMessage.Message
, which looks like:
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
I can easily pass the record
parameter which contains the actual message. But I don't know what to pass in the passThrough
parameter. According to the docs:
The
passThrough
field may hold any element that is passed through theConsumer#flow
and included in theResult
. That is useful when some context is needed to be passed on downstream operations. That could be done with unzip/zip, but this is more convenient. It can for example be aConsumerMessage.CommittableOffset
orConsumerMessage.CommittableOffsetBatch
that can be committed later in the flow.
In my case there is no any Kafka consumer subscribing to a Kafka topic and generating a Source
(comittableSource
or plainSource
) for my stream. In that case, I would have passed the consumer offset as described in the docs. But in my case, an actor is simulating such a consumer. That means I don't have access to ConsumerMessage.CommittableOffset
. So what do I pass in for the passThrough
parameter here? What would be the best practice in this case?