I am running an Akka Streams Kafka application and I want to incorporate the supervision strategy on the stream consumer such that if the broker goes down, and the stream consumer dies after a stop timeout, the supervisor can restart the consumer.
Here is my complete code:
UserEventStream
:
import akka.actor.{Actor, PoisonPill, Props}
import akka.kafka.{ConsumerSettings, Subscriptions}
import akka.kafka.scaladsl.Consumer
import akka.stream.scaladsl.Sink
import akka.util.Timeout
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, StringDeserializer}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import akka.pattern.ask
import akka.stream.ActorMaterializer
class UserEventStream extends Actor {
val settings = Settings(context.system).KafkaConsumers
implicit val timeout: Timeout = Timeout(10 seconds)
implicit val materializer = ActorMaterializer()
override def preStart(): Unit = {
super.preStart()
println("Starting UserEventStream....s")
}
override def receive = {
case "start" =>
val consumerConfig = settings.KafkaConsumerInfo
println(s"ConsumerConfig with $consumerConfig")
startStreamConsumer(consumerConfig("UserEventMessage" + ".c" + 1))
}
def startStreamConsumer(config: Map[String, String]) = {
println(s"startStreamConsumer with config $config")
val consumerSource = createConsumerSource(config)
val consumerSink = createConsumerSink()
val messageProcessor = context.actorOf(Props[MessageProcessor], "messageprocessor")
println("START: The UserEventStream processing")
val future =
consumerSource
.mapAsync(parallelism = 50) { message =>
val m = s"${message.record.value()}"
messageProcessor ? m
}
.runWith(consumerSink)
future.onComplete {
case Failure(ex) =>
println("FAILURE : The UserEventStream processing, stopping the actor.")
self ! PoisonPill
case Success(ex) =>
}
}
def createConsumerSource(config: Map[String, String]) = {
val kafkaMBAddress = config("bootstrap-servers")
val groupID = config("groupId")
val topicSubscription = config("subscription-topic").split(',').toList
println(s"Subscriptiontopics $topicSubscription")
val consumerSettings = ConsumerSettings(context.system, new ByteArrayDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaMBAddress)
.withGroupId(groupID)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
Consumer.committableSource(consumerSettings, Subscriptions.topics(topicSubscription: _*))
}
def createConsumerSink() = {
Sink.foreach(println)
}
}
StreamProcessorSupervisor
(this is the supervisor class of the UserEventStream
class):
import akka.actor.{Actor, Props}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.ActorMaterializer
import stream.StreamProcessorSupervisor.StartClient
import scala.concurrent.duration._
object StreamProcessorSupervisor {
final case object StartSimulator
final case class StartClient(id: String)
def props(implicit materializer: ActorMaterializer) =
Props(classOf[StreamProcessorSupervisor], materializer)
}
class StreamProcessorSupervisor(implicit materializer: ActorMaterializer) extends Actor {
override def preStart(): Unit = {
self ! StartClient(self.path.name)
}
def receive: Receive = {
case StartClient(id) =>
println(s"startCLient with id $id")
val childProps = Props(classOf[UserEventStream])
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = "usereventstream",
minBackoff = 1.second,
maxBackoff = 1.minutes,
randomFactor = 0.2
)
)
context.actorOf(supervisor, name = s"$id-backoff-supervisor")
val userEventStrean = context.actorOf(Props(classOf[UserEventStream]),"usereventstream")
userEventStrean ! "start"
}
}
App
(the main application class):
import akka.actor.{ActorSystem, Props}
import akka.stream.ActorMaterializer
object App extends App {
implicit val system = ActorSystem("stream-test")
implicit val materializer = ActorMaterializer()
system.actorOf(StreamProcessorSupervisor.props,"StreamProcessorSupervisor")
}
application.conf
:
kafka {
consumer {
num-consumers = "1"
c1 {
bootstrap-servers = "localhost:9092"
bootstrap-servers = ${?KAFKA_CONSUMER_ENDPOINT1}
groupId = "localakkagroup1"
subscription-topic = "test"
subscription-topic = ${?SUBSCRIPTION_TOPIC1}
message-type = "UserEventMessage"
poll-interval = 50ms
poll-timeout = 50ms
stop-timeout = 30s
close-timeout = 20s
commit-timeout = 15s
wakeup-timeout = 10s
max-wakeups = 10
use-dispatcher = "akka.kafka.default-dispatcher"
kafka-clients {
enable.auto.commit = true
}
}
}
}
After running the application, I purposely killed the Kafka broker and then found that after 30 seconds, the actor is stopping itself by sending a poison pill. But strangely it doesn't restart as mentioned in the BackoffSupervisor
strategy.
What could be the issue here?