We are observing a strange behavior with our Servicetest and embedded Kafka.
The Test is a Spock Test, we use the JUnit Rule KafkaEmbedded and propagate brokersAsString as follows:
@ClassRule
@Shared
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1)
@Autowired
KafkaListenerEndpointRegistry endpointRegistry
def setupSpec() {
System.setProperty("kafka.bootstrapServers", embeddedKafka.getBrokersAsString())
}
From inspecting the Code of KafkaEmbedded, constructing an Instance with KafkaEmbedded(int count)
leads to one Kafka Server with two partitions per topic.
In order to tackle issues with partition assignment and server-client synchronization in the test, we follow the strategy as seen in ContainerTestUtils class from spring-kafka.
public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions)
throws Exception {
log.info(
"Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " +
"partitions.")
int n = 0;
int count = 0;
while (n++ < 600 && count < partitions) {
count = 0;
container.getAssignedPartitions().each {
TopicPartition it ->
log.info(it.topic() + ":" + it.partition() + "; ")
}
if (container.getAssignedPartitions() != null) {
count = container.getAssignedPartitions().size();
}
if (count < partitions) {
Thread.sleep(100);
}
}
}
When we observe the logs we notice the following pattern:
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.696 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.807 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.811 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.812 WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-07-29 11:24:03.544 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-07-29 11:24:03.602 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2016-07-29 11:24:03.637 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[]
2016-07-29 11:24:04.065 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0]
2016-07-29 11:24:04.066 INFO 1160 --- [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http)
2016-07-29 11:24:04.073 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456)
2016-07-29 11:24:04.237 INFO 1160 --- [ main] org.eclipse.jetty.server.Server : jetty-9.2.17.v20160517
2016-07-29 11:24:04.265 INFO 1160 --- [ main] o.e.jetty.server.handler.ContextHandler : Started o.e.j.s.ServletContextHandler@6a8598e7{/__admin,null,AVAILABLE}
2016-07-29 11:24:04.270 INFO 1160 --- [ main] o.e.jetty.server.handler.ContextHandler : Started o.e.j.s.ServletContextHandler@104ea372{/,null,AVAILABLE}
2016-07-29 11:24:04.279 INFO 1160 --- [ main] o.eclipse.jetty.server.ServerConnector : Started ServerConnector@3c9b416a{HTTP/1.1}{0.0.0.0:50811}
2016-07-29 11:24:04.430 INFO 1160 --- [ main] o.eclipse.jetty.server.ServerConnector : Started ServerConnector@7c214597{SSL-http/1.1}{0.0.0.0:50812}
2016-07-29 11:24:04.430 INFO 1160 --- [ main] org.eclipse.jetty.server.Server : Started @25813ms
2016-07-29 11:24:04.632 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting...
2016-07-29 11:24:04.662 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-07-29 11:24:13.644 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[]
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0]
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0]
2016-07-29 11:24:13.655 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0]
2016-07-29 11:24:13.740 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:16.644 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:16.666 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[staggering-0]
2016-07-29 11:24:16.750 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:23.559 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.660 INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.662 INFO 1160 --- [ main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[moa-0]
2016-07-29 11:24:23.686 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked:[deliveryZipCode_v1-0]
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[moa-0]
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[staggering-0]
2016-07-29 11:24:23.695 INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned:[deliveryZipCode_v1-0]
Please note the [..] indication omitted lines
We set metadata.max.age.ms
to 3000 ms
As a result it tries to refresh the metadata information frequently.
What puzzles us now is, that if we wait for two partitions to connect, the wait will time out. Only if we wait for one partition to connect, after a while everything runs successfully.
Did we understand the code wrong, that there are two partitions per topic in the embedded Kafka? Is it normal that only one is assigned to our Listeners?