I create a Source
of consumer records using Reactive Kafka as follows:
val settings = ConsumerSettings(system, keyDeserializer, valueDeserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(groupName)
// what offset to begin with if there's no offset for this group
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
// do we want to automatically commit offsets?
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
// auto-commit offsets every 1 minute, in the background
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
// reconnect every 1 second, when disconnected
.withProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, "1000")
// every session lasts 30 seconds
.withProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
// send heartbeat every 10 seconds i.e. 1/3 * session.timeout.ms
.withProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000")
// how many records to fetch in each poll( )
.withProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100")
Consumer.atMostOnceSource(settings, Subscriptions.topics(topic)).map(_.value)
I have 1 instance of Kafka running on my local machine. I push values into the topic via the console producer and see them printed out. Then I kill Kafka, and restart it to see if the source reconnects.
These are how my logs proceed:
* Connection with /192.168.0.1 disconnected
java.net.ConnectException: Connection refused
* Give up sending metadata request since no node is available
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Resuming partition test-events-0
* Error while fetching metadata with correlation id 139 : {test-events=INVALID_REPLICATION_FACTOR}
* Sending metadata request (type=MetadataRequest, topics=test-events) to node 0
* Sending GroupCoordinator request for group mytestgroup to broker 192.168.0.1:9092 (id: 0 rack: null)
* Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797713078, latencyMs=70, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=166,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
* Error while fetching metadata with correlation id 169 : {test-events=INVALID_REPLICATION_FACTOR}
* Received GroupCoordinator response ClientResponse(receivedTimeMs=1491797716169, latencyMs=72, disconnected=false, requestHeader={api_key=10,api_version=0,correlation_id=196,client_id=consumer-1}, responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}) for group mytestgroup
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group coordinator lookup for group mytestgroup failed: The group coordinator is not available.
09:45:16.169 [testsystem-akka.kafka.default-dispatcher-16] DEBUG o.a.k.c.c.i.AbstractCoordinator - Coordinator discovery failed for group mytestgroup, refreshing metadata
* Initiating API versions fetch from node 2147483647
* Offset commit for group mytestgroup failed: This is not the correct coordinator for this group.
* Marking the coordinator 192.168.43.25:9092 (id: 2147483647 rack: null) dead for group mytestgroup
* The Kafka consumer has closed.
How do I make sure that this Source reconnects and continues processing the logs?