[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J: No SLF4J providers were found. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#noProviders for further details. [INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] Completing [INFO] [akkaDeadLetter][06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] to Actor[akka://default/system/kafka-consumer-1#76265671] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://default/system/kafka-consumer-1#76265671] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
This is the issue I'm facing but only when I'm using a flow with Kafka source.
Here is my code
val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
Json.parse(message.value).as[Book]
}
val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))
def toInstant(book: Book): Future[Book] = Future{
val array = book.publicationDate.split("-")
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
val timeTillNow = Time.timeSince(instant)
val royalty = if (timeTillNow.toDays > 1000) {
.10 * book.price * book.copiesSold
} else {
.15 * book.price * book.copiesSold
}
book.copy(royalty = Some(royalty))
}
val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
On running following I'm getting the dead-letter
bookData.via(parseBook).via(flow).runWith(print)
but no dead letter while running this: bookData.via(parseBook).runWith(print)
.
Even on running the later one I'm getting the expected output.
Can someone help?