1
votes

I have built a very simple akka stream based on the alpakka project, but it doesn't read anything from kafka even though it connects and creates a consumer group. I have created an implicit Actor System and Materializer for the stream.

val done = Consumer.committableSource(consumerSettings,
Subscriptions.topics(kafkaTopic))
.map(msg => msg.committableOffset)
.mapAsync(1) { offset =>
offset.commitScaladsl()
}
.runWith(Sink.ignore)
  • [stream.actor.dispatcher] sends this message to KafkaConsumerActor "Requesting messages, requestId: 1, partitions: Set(kafka-topic-0)"
  • The KafkaConsumerActor doesn't seem to receive the message but when the supervisor asks the Actor to shutdown it does receive the message and shutdown.

Any lead on why it fails to read Kafka without an Error or Exception ?

1
on enabling further logs, I found that - DEBUG] [06/06/2019 18:25:29.655] [akka-kafka-akka.kafka.default-dispatcher-15] [akka://akka-kafka-poc/system/kafka-consumer-1] received handled message Poll(akka.kafka.internal.KafkaConsumerActor@20eb829b,true) from Actor[akka://akka-kafka-poc/deadLetters] The kafka-consumer is receiveing messages from an actor deadLetters. Not sure whether the kafka-consumer actor is getting killed or the dispatcher that sends the poll to the kafka-consumer is dead.b1tchacked

1 Answers

0
votes

I couldn't figure out why my akka stream wasn't consuming messages from the kafka broker, But When I implemented the same stream as a Runnable Graph, it worked.

Examples that I used - https://www.programcreek.com/scala/akka.stream.scaladsl.RunnableGraph