1
votes

We are observing a strange behavior with our Servicetest and embedded Kafka.

The Test is a Spock Test, we use the JUnit Rule KafkaEmbedded and propagate brokersAsString as follows:

@ClassRule
@Shared
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1)

@Autowired
KafkaListenerEndpointRegistry endpointRegistry

def setupSpec() {
    System.setProperty("kafka.bootstrapServers",  embeddedKafka.getBrokersAsString())
}

From inspecting the Code of KafkaEmbedded, constructing an Instance with KafkaEmbedded(int count) leads to one Kafka Server with two partitions per topic.

In order to tackle issues with partition assignment and server-client synchronization in the test, we follow the strategy as seen in ContainerTestUtils class from spring-kafka.

public static void waitForAssignment(KafkaMessageListenerContainer<String, String> container, int partitions)
        throws Exception {

        log.info(
            "Waiting for " + container.getContainerProperties().getTopics() + " to connect to " + partitions + " " +
                "partitions.")

        int n = 0;
        int count = 0;
        while (n++ < 600 && count < partitions) {
            count = 0;
            container.getAssignedPartitions().each {
                TopicPartition it ->
                    log.info(it.topic() + ":" + it.partition() + "; ")
            }

            if (container.getAssignedPartitions() != null) {
                count = container.getAssignedPartitions().size();
            }
            if (count < partitions) {
                Thread.sleep(100);
            }
        }
     }

When we observe the logs we notice the following pattern:

2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.600  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 1 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.696  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.699  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 3 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.807  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {deliveryZipCode_v1=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.811  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {staggering=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:02.812  WARN 1160 --- [afka-consumer-1] org.apache.kafka.clients.NetworkClient   : Error while fetching metadata with correlation id 5 : {moa=LEADER_NOT_AVAILABLE}
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.544  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:03.602  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : SyncGroup for group timeslot-service-group-06x failed due to coordinator rebalance, rejoining the group
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:03.637  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[]
2016-07-29 11:24:04.065  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:04.066  INFO 1160 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 50810 (http)
2016-07-29 11:24:04.073  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Started AllocationsDeliveryZonesServiceSpec in 20.616 seconds (JVM running for 25.456)
2016-07-29 11:24:04.237  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : jetty-9.2.17.v20160517
2016-07-29 11:24:04.265  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@6a8598e7{/__admin,null,AVAILABLE}
2016-07-29 11:24:04.270  INFO 1160 --- [           main] o.e.jetty.server.handler.ContextHandler  : Started o.e.j.s.ServletContextHandler@104ea372{/,null,AVAILABLE}
2016-07-29 11:24:04.279  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@3c9b416a{HTTP/1.1}{0.0.0.0:50811}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] o.eclipse.jetty.server.ServerConnector   : Started ServerConnector@7c214597{SSL-http/1.1}{0.0.0.0:50812}
2016-07-29 11:24:04.430  INFO 1160 --- [           main] org.eclipse.jetty.server.Server          : Started @25813ms
2016-07-29 11:24:04.632  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : waiting...
2016-07-29 11:24:04.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : Waiting for [moa] to connect to 2 partitions.^
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.644  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:13.655  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]
2016-07-29 11:24:13.740  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:16.644  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:16.666  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[staggering-0]
2016-07-29 11:24:16.750  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
[...]
2016-07-29 11:24:23.559  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.660  INFO 1160 --- [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Attempt to heart beat failed since the group is rebalancing, try to re-join group.
2016-07-29 11:24:23.662  INFO 1160 --- [           main] .t.s.AllocationsDeliveryZonesServiceSpec : moa:0;
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[moa-0]
2016-07-29 11:24:23.686  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[deliveryZipCode_v1-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[moa-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[staggering-0]
2016-07-29 11:24:23.695  INFO 1160 --- [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[deliveryZipCode_v1-0]

Please note the [..] indication omitted lines

We set metadata.max.age.ms to 3000 ms As a result it tries to refresh the metadata information frequently.

What puzzles us now is, that if we wait for two partitions to connect, the wait will time out. Only if we wait for one partition to connect, after a while everything runs successfully.

Did we understand the code wrong, that there are two partitions per topic in the embedded Kafka? Is it normal that only one is assigned to our Listeners?

2
I noticed a similar flakiness using spring embedded kafka library. When we have multiple consumers joining the same group in the test, the test will fail (but not always, at around 10% of the time) when there is a rebalancing happening, which seems to align with the logs here.Zhongxia Zhou

2 Answers

0
votes

I can't explain the flakiness you're seeing; yes, each topic gets 2 partitions by default. I just ran one of the framework container tests and see this...

09:24:06.139 INFO  [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions revoked:[]
09:24:06.611 INFO  [testSlow3-kafka-consumer-1][org.springframework.kafka.listener.KafkaMessageListenerContainer] partitions assigned:[testTopic3-1, testTopic3-0]
0
votes

For testing, it is important to set spring.kafka.consumer.auto-offset-reset=earliest to avoid race condition (sequence or timing of consumer versus producer), see https://docs.spring.io/spring-kafka/reference/html/#junit

Starting with version 2.5, the consumerProps method sets the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. This is because, in most cases, you want the consumer to consume any messages sent in a test case. The ConsumerConfig default is latest which means that messages already sent by a test, before the consumer starts, will not receive those records. To revert to the previous behavior, set the property to latest after calling the method.