0
votes

I am running an Akka Streams Kafka application and I want to incorporate the supervision strategy on the stream consumer such that if the broker goes down, and the stream consumer dies after a stop timeout, the supervisor can restart the consumer.

Here is my complete code:

UserEventStream:

import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer

class UserEventStream extends Actor {

  val settings = Settings(context.system).KafkaConsumers
  implicit val timeout: Timeout = Timeout(10 seconds)
  implicit val materializer = ActorMaterializer()

  override def preStart(): Unit = {
    super.preStart()
    println("Starting UserEventStream....s")
  }
  override def receive = {
    case "start" =>
      val consumerConfig = settings.KafkaConsumerInfo
      println(s"ConsumerConfig with $consumerConfig")
      startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
  }

  def startStreamConsumer(config: Map[String, String]) = {
    println(s"startStreamConsumer with config $config")

    val consumerSource = createConsumerSource(config)
    val consumerSink = createConsumerSink()
    val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")

    println("START: The UserEventStream processing")
    val future =
      consumerSource
        .mapAsync(parallelism = 50) { message =>
          val m = s"${message.record.value()}"
          messageProcessor ? m
        }
        .runWith(consumerSink)
    future.onComplete {
      case Failure(ex) =>
        println("FAILURE : The UserEventStream processing, stopping the actor.")
        self ! PoisonPill
      case Success(ex) =>
    }
  }

  def createConsumerSource(config: Map[String, String]) = {
    val kafkaMBAddress = config("bootstrap-servers")
    val groupID = config("groupId")
    val topicSubscription = config("subscription-topic").split(',').toList
    println(s"Subscriptiontopics $topicSubscription")

    val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
      .withBootstrapServers(kafkaMBAddress)
      .withGroupId(groupID)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
      .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")

    Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
  }

  def createConsumerSink() = {
    Sink.foreach(println)
  }
}  

StreamProcessorSupervisor (this is the supervisor class of the UserEventStream class):

import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._

object StreamProcessorSupervisor {
  final case object StartSimulator
  final case class StartClient(id: String)
  def props(implicit materializer: ActorMaterializer) =
    Props(classOf[StreamProcessorSupervisor], materializer)
}

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props(classOf[UserEventStream])
      val supervisor = BackoffSupervisor.props(
        Backoff.onFailure(
          childProps,
          childName = "usereventstream",
          minBackoff = 1.second,
          maxBackoff = 1.minutes,
          randomFactor = 0.2
        )
      )
      context.actorOf(supervisor, name = s"$id-backoff-supervisor")
      val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
      userEventStrean ! "start"
  }
}

App (the main application class):

import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer

object App extends App {

  implicit val system = ActorSystem("stream-test")
  implicit val materializer = ActorMaterializer()

  system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}

application.conf:

kafka {

  consumer {

    num-consumers = "1"
    c1 {
      bootstrap-servers = "localhost:9092"
      bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
      groupId = "localakkagroup1"
      subscription-topic = "test"
      subscription-topic = ${?SUBSCRIPTION_TOPIC1}
      message-type = "UserEventMessage"
      poll-interval = 50ms
      poll-timeout = 50ms
      stop-timeout = 30s
      close-timeout = 20s
      commit-timeout = 15s
      wakeup-timeout = 10s
      max-wakeups = 10
      use-dispatcher = "akka.kafka.default-dispatcher"
      kafka-clients {
        enable.auto.commit = true
      }
    }
  }
}

After running the application, I purposely killed the Kafka broker and then found that after 30 seconds, the actor is stopping itself by sending a poison pill. But strangely it doesn't restart as mentioned in the BackoffSupervisor strategy.

What could be the issue here?

1

1 Answers

0
votes

There are two instances of UserEventStream in your code: one is the child actor that the BackoffSupervisor internally creates with the Props that you pass to it, and the other is the val userEventStrean that is a child of StreamProcessorSupervisor. You're sending the "start" message to the latter, when you should be sending that message to the former.

You don't need val userEventStrean, because the BackoffSupervisor creates the child actor. Messages sent to the BackoffSupervisor are forwarded to the child, so to send a "start" message to the child, send it to the BackoffSupervisor:

class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
  override def preStart(): Unit = {
    self ! StartClient(self.path.name)
  }

  def receive: Receive = {
    case StartClient(id) =>
      println(s"startCLient with id $id")
      val childProps = Props[UserEventStream]
      val supervisorProps = BackoffSupervisor.props(...)
      val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
      supervisor ! "start"
  }
}

The other issue is that when an actor receives a PoisonPill, that's not the same thing as that actor throwing an exception. Therefore, Backoff.onFailure won't be triggered when UserEventStream sends itself a PoisonPill. A PoisonPill stops the actor, so use Backoff.onStop instead:

val supervisorProps = BackoffSupervisor.props(
  Backoff.onStop( // <--- use onStop
    childProps,
    ...
  )
)
val supervisor = context.actorOf(supervisorProps, name = s"$id-backoff-supervisor")
supervisor ! "start"