1
votes

I am building a web application using spring boot and now I have this requirement of receiving real-time notifications. I am planning to use apache kafka as a message broker for this. The requirement is such that there are users with different roles and based on the role, they should receive notifications of what other users are doing.
I did set up a single producer and consumer and as a consumer, I could receive the information published to a topic let's say topic1.
The part where I am stuck is that I could have several users listening to the same topic and each user should get the message published to that topic. I understand that for this requirement, we need to set different group.id for each kafkalistener so that each consumer can get the message.
But how I am going to create a kafkalistener with a different group id when a user is logged in? Hope someone can provide some guidance on that? Thank you

1
Your understanding is correct.. If there are any consumer with a different group.id will receive the message separately. But from your description, not clear why would you need multiple group.id. I think your server should listen to all the messages, and should identify the correct user of that message, and have a logic to push the message to that particular user alone. (If not possible, can you elaborate a bit more ? ) - Abbin Varghese
I need multiple group.id because several users with the same roles could log in and each of these users need to get the notifications. - ashley
The group.id means an application (or a process) ID that it listening to the kafka topic, so you could have many users on the same application with the same group.id, and with that, you use one listener to receive all messages that you can filter by user's session Id . - Yassine Abainou

1 Answers

1
votes

Simply create a new KafkaMessageListenerContainer each time and start/stop it as needed.

You can use Boot's auto-configured ConcurrentKafkaListenerContainerFactory to create containers. Just set the groupId container property to make them unique.

EDIT

Here's an example:

@SpringBootApplication
public class So60150686Application {

    public static void main(String[] args) {
        SpringApplication.run(So60150686Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so60150686", "foo");
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so60150686").partitions(1).replicas(1).build();
    }

}

@RestController
class Web {

    private final ConcurrentKafkaListenerContainerFactory<String, String> factory;

    public Web(ConcurrentKafkaListenerContainerFactory<String, String> factory) {
        this.factory = factory;
    }

    @GetMapping(path="/foo/{group}")
    public String foo(@PathVariable String group) {
        ConcurrentMessageListenerContainer<String, String> container = factory.createContainer("so60150686");
        container.getContainerProperties().setGroupId(group);
        container.getContainerProperties().setMessageListener(new MessageListener<String, String>() {

            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record);
            }

        });
        container.start();
        return "ok";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
$ http localhost:8080/foo/bar
HTTP/1.1 200 
Connection: keep-alive
Content-Length: 2
Content-Type: text/plain;charset=UTF-8
Date: Mon, 10 Feb 2020 19:42:02 GMT
Keep-Alive: timeout=60

ok

2020-02-10 14:42:09.744 INFO 34096 --- [ consumer-0-C-1] o.s.k.l.KafkaMessageListenerContainer : bar: partitions assigned: [so60150686-0]

ConsumerRecord(topic = so60150686, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1581363648938, serialized key size = -1, serialized value size = 3, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = foo)