1
votes

I am using Scala 2.11 and Akka Streams Kafka 0.17.

I have a stream where:

  • A Source is created using Source.actorRef. Here, the actor is scheduled to run at some regular interval and generate messages continuously, which are emitted to the stream.
  • I have attached a Producer as a Flow. The producer pushes ProducerMessage.Message to a Kafka topic.
  • Some DB operations.

I have a problem while constructing the ProducerMessage.Message, which looks like:

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V],
    passThrough: PassThrough
  )

I can easily pass the record parameter which contains the actual message. But I don't know what to pass in the passThrough parameter. According to the docs:

The passThrough field may hold any element that is passed through the Consumer#flow and included in the Result. That is useful when some context is needed to be passed on downstream operations. That could be done with unzip/zip, but this is more convenient. It can for example be a ConsumerMessage.CommittableOffset or ConsumerMessage.CommittableOffsetBatch that can be committed later in the flow.

In my case there is no any Kafka consumer subscribing to a Kafka topic and generating a Source (comittableSource or plainSource) for my stream. In that case, I would have passed the consumer offset as described in the docs. But in my case, an actor is simulating such a consumer. That means I don't have access to ConsumerMessage.CommittableOffset. So what do I pass in for the passThrough parameter here? What would be the best practice in this case?

1

1 Answers

0
votes

After forwarding my issue with the reactive-kafka team, I got my answer. Basically,what they said was, if you do not have a use case to pass through anything,you can try setting it to None or NotUsed, or perhaps just the empty String "".

Also note that if you use perhaps a Producer.plainSink you do not need to construct a ProducerMessage.Message. Then, you can directly construct a Kafka ProducerRecord. That ProducerMessage.Message case class is just a container for the cases where pass through is desired or needed. Other than the element to be passed through, it just contains a Kafka ProducerRecord.