0
votes

[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] Starting. StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J: No SLF4J providers were found. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#noProviders for further details. [INFO] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] Completing [INFO] [akkaDeadLetter][06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] Message [akka.kafka.internal.KafkaConsumerActor$Internal$StopFromStage] from Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] to Actor[akka://default/system/kafka-consumer-1#76265671] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://default/system/kafka-consumer-1#76265671] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

This is the issue I'm facing but only when I'm using a flow with Kafka source.

Here is my code

val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
  val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
    Json.parse(message.value).as[Book]
  }
  val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))

  def toInstant(book: Book): Future[Book] = Future{
    val array = book.publicationDate.split("-")
    val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
    val timeTillNow = Time.timeSince(instant)
    val royalty = if (timeTillNow.toDays > 1000) {
      .10 * book.price * book.copiesSold
    } else {
      .15 * book.price * book.copiesSold
    }
    book.copy(royalty = Some(royalty))
  }
 val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))

On running following I'm getting the dead-letter bookData.via(parseBook).via(flow).runWith(print) but no dead letter while running this: bookData.via(parseBook).runWith(print). Even on running the later one I'm getting the expected output.

Can someone help?

1

1 Answers

0
votes

This is most likely happening because the flow is throwing an exception. Normally, I'd expect to see logging with more details, but those might be missing because no SLF4J implementation is configured. See "SLF4J backend" in the Akka documentation for details on how to configure logging.

Looking through your code, I think the error is most likely on this line:

val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)

Note that it's using array(2).toInt for both the day and the month. My guess is that the month should be Month.of(array(1).toInt). If array(2) is indeed the day of the month, then any values above 12 will cause Month.of to throw a DateTimeException, which will terminate the stream.