1
votes

I'm using spring boot 2.2.4-RELEASE, spring-kafka 2.4.2.RELEASE

My scenario is the following one:

In my microservice (let's call it producer microservice) I need to create kakfa topic and then, on some circumstances, I need to send message over a single topic.

This message must be received and handled by another microservice (let's call it consumer microservice). In this consumer microservice I must create kafka-listener every time a new topic is created on the server side.

So I wrote the folloqing code

producer microservice

spring kafka config:

@Configuration
public class WebmailKafkaConfig {
    @Autowired
    private Environment environment;
    @Bean
    public KafkaAdmin kafkaAdmin(){
        Map<String, Object> configuration = new HashMap<String, Object>();
        configuration.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
        KafkaAdmin result = new KafkaAdmin(configuration);
        return result;
    }
    @Bean
    public ProducerFactory<String, RicezioneMailMessage> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, environment.getProperty("webmail.be.messaging.kafka.bootstrap.address"));
        configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean("ricezioneMailMessageKafkaTemplate")
    public KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

spring service kafka manager

@Service
public class WebmailKafkaTopicSvcImpl implements WebmailKafkaTopicSvc {
    private static final Logger logger = LoggerFactory.getLogger(WebmailKafkaTopicSvcImpl.class.getName());
    @Autowired
    private KafkaAdmin kafkaAdmin;
    @Value("${webmail.be.messaging.kafka.topic.numero.partizioni}")
    private int numeroPartizioni;
    @Value("${webmail.be.messaging.kafka.topic.fattore.replica}")
    private short fattoreReplica;
    @Autowired
    @Qualifier("ricezioneMailMessageKafkaTemplate")
    private KafkaTemplate<String, RicezioneMailMessage> ricezioneMailMessageKafkaTemplate;
    @Override
    public void createKafkaTopic(String topicName) throws Exception {
        if(!StringUtils.hasText(topicName)){
            throw new IllegalArgumentException("Passato un topic name non valido ["+topicName+"]");
        }
        AdminClient adminClient = null;
        try{
            adminClient = AdminClient.create(kafkaAdmin.getConfig());
            List<NewTopic> topics = new ArrayList<>(1);
            NewTopic topic = new NewTopic(topicName, numeroPartizioni, fattoreReplica);
            topics.add(topic);
            CreateTopicsResult result = adminClient.createTopics(topics);
            result.all().whenComplete(new KafkaFuture.BiConsumer<Void, Throwable>() {
                @Override
                public void accept(Void aVoid, Throwable throwable) {
                    if( throwable != null ){
                        logger.error("Errore creazione topic", throwable);
                    }
                }
            });

        }finally {
            if( adminClient != null ){
                adminClient.close();
            }
        }
    }

    @Override
    public void sendMessage(RicezioneMailMessage rmm) throws Exception {
        ListenableFuture<SendResult<String, RicezioneMailMessage>> future = ricezioneMailMessageKafkaTemplate.send(rmm.getPk(), rmm);
        future.addCallback(new ListenableFutureCallback<SendResult<String, RicezioneMailMessage>>() {
            @Override
            public void onFailure(Throwable ex) {
                if( logger.isWarnEnabled() ){
                    logger.warn("Impossibile inviare il messaggio=["
                            + rmm + "] a causa di : " + ex.getMessage(),ex);
                }
            }

            @Override
            public void onSuccess(SendResult<String, RicezioneMailMessage> result) {
                if(logger.isTraceEnabled()){
                    logger.trace("Inviato messaggio=[" + rmm +
                            "] con offset=[" + result.getRecordMetadata().offset() + "]");
                }
            }
        });
    }
}

In the producer side all works pretty good. I'm able in creating topics and sending messages.

consumer microservice

dynamic listener class

public class DynamicKafkaConsumer {
    private final String brokerAddress;
    private final String topicName;
    private boolean stopTest;
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaConsumer.class.getName());
    public DynamicKafkaConsumer(String brokerAddress, String topicName) {
        if( !StringUtils.hasText(brokerAddress)){
            throw new IllegalArgumentException("Passato un broker address non valido");
        }
        if( !StringUtils.hasText(topicName)){
            throw new IllegalArgumentException("Passato un topicName non valido");
        }
        this.brokerAddress = brokerAddress;
        this.topicName = topicName;
        if( logger.isTraceEnabled() ){
            logger.trace("Creato {} con topicName {} e brokerAddress {}", this.getClass().getName(), this.topicName, this.brokerAddress);
        }
    }
    public final void start() {
        MessageListener<String, RicezioneMailMessage> messageListener = (record -> {
            RicezioneMailMessage messaggioRicevuto = record.value();

            if( logger.isInfoEnabled() ){
                logger.info("Ricevuto messaggio {} su topic {}", messaggioRicevuto, topicName);
            }
            stopTest = true;
        });
        ConcurrentMessageListenerContainer<String, RicezioneMailMessage> container =
                new ConcurrentMessageListenerContainer<>(
                        consumerFactory(brokerAddress),
                        containerProperties(topicName, messageListener));

        container.start();
    }
    private DefaultKafkaConsumerFactory<String, RicezioneMailMessage> consumerFactory(String brokerAddress) {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfig(brokerAddress),
                new StringDeserializer(),
                new JsonDeserializer<>(RicezioneMailMessage.class));
    }

    private ContainerProperties containerProperties(String topic, MessageListener<String, RicezioneMailMessage> messageListener) {
        ContainerProperties containerProperties = new ContainerProperties(topic);
        containerProperties.setMessageListener(messageListener);
        return containerProperties;
    }

    private Map<String, Object> consumerConfig(String brokerAddress) {
        return Map.of(
                BOOTSTRAP_SERVERS_CONFIG, brokerAddress,
                GROUP_ID_CONFIG, "groupId",
                AUTO_OFFSET_RESET_CONFIG, "earliest",
                ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"
        );
    }

    public boolean isStopTest() {
        return stopTest;
    }

    public void setStopTest(boolean stopTest) {
        this.stopTest = stopTest;
    }
}

simple unit test

public class TestRicezioneMessaggiCasellaPostale {
    private static final Logger logger = LoggerFactory.getLogger(TestRicezioneMessaggiCasellaPostale.class.getName());

    @Test
    public void testRicezioneMessaggiMail() {
        try {

            String brokerAddress = "localhost:9092";
            DynamicKafkaConsumer consumer = new DynamicKafkaConsumer(brokerAddress, "f586caf2-ffdc-4e3a-88b9-a262a502f8ac");
            consumer.start();
            boolean stopTest = consumer.isStopTest();
            while (!stopTest) {

                stopTest = consumer.isStopTest();
            }
        } catch (Exception e) {
            logger.error("Errore nella configurazione della casella postale; {}", e.getMessage(), e);
        }
    }
}

In the consumer side I can't read any message; note that the topic "f586caf2-ffdc-4e3a-88b9-a262a502f8ac" exsists and it's the same topic used on the producer side.

When I send a message on the producer side I can see this log:

2020-02-19 22:00:22,320 52822 [kafka-producer-network-thread | producer-1] TRACE i.e.t.r.p.w.b.s.i.WebmailKafkaTopicSvcImpl - Inviato messaggio=[RicezioneMailMessage{pk='c5c8f8a4-8ddd-407a-9e51-f6b14d84f304', tipoMessaggio='mail'}] con offset=[0]

On the consumer side I don't see any message. I just see the following prints:

2020-02-19 22:00:03,194 1442 [main] INFO  o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values:      
allow.auto.create.topics = false    
auto.commit.interval.ms = 5000  
auto.offset.reset = earliest    
bootstrap.servers = [localhost:9092]    
check.crcs = true   
client.dns.lookup = default     
client.id =     
client.rack =   
connections.max.idle.ms = 540000    
default.api.timeout.ms = 60000  
enable.auto.commit = false  
exclude.internal.topics = true  
fetch.max.bytes = 52428800  
fetch.max.wait.ms = 500     
fetch.min.bytes = 1     
group.id = groupId  
group.instance.id = null    
heartbeat.interval.ms = 3000    
interceptor.classes = []    
internal.leave.group.on.close = true    
isolation.level = read_uncommitted  
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
max.partition.fetch.bytes = 1048576     
max.poll.interval.ms = 300000   
max.poll.records = 500  
metadata.max.age.ms = 300000    
metric.reporters = []   
metrics.num.samples = 2     
metrics.recording.level = INFO  
metrics.sample.window.ms = 30000    
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]     
receive.buffer.bytes
= 65536     
reconnect.backoff.max.ms = 1000     
reconnect.backoff.ms = 50   
request.timeout.ms = 30000  
retry.backoff.ms = 100  
sasl.client.callback.handler.class = null   
sasl.jaas.config = null     
sasl.kerberos.kinit.cmd = /usr/bin/kinit    sasl.kerberos.min.time.before.relogin = 60000   
sasl.kerberos.service.name = null   
sasl.kerberos.ticket.renew.jitter = 0.05    
sasl.kerberos.ticket.renew.window.factor = 0.8  
sasl.login.callback.handler.class = null    
sasl.login.class = null     
sasl.login.refresh.buffer.seconds = 300     
sasl.login.refresh.min.period.seconds = 60  
sasl.login.refresh.window.factor = 0.8  
sasl.login.refresh.window.jitter = 0.05     
sasl.mechanism = GSSAPI     
security.protocol = PLAINTEXT   
security.providers = null   
send.buffer.bytes = 131072  
session.timeout.ms = 10000  
ssl.cipher.suites = null    
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]   
ssl.endpoint.identification.algorithm = https   
ssl.key.password = null     
ssl.keymanager.algorithm = SunX509  
ssl.keystore.location = null    
ssl.keystore.password = null    
ssl.keystore.type = JKS     
ssl.protocol = TLS  
ssl.provider = null     
ssl.secure.random.implementation = null     
ssl.trustmanager.algorithm = PKIX   
ssl.truststore.location = null  
ssl.truststore.password = null  
ssl.truststore.type = JKS   
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer   
2020-02-19 22:00:03,630 1878 [main] INFO  o.a.k.common.utils.AppInfoParser - Kafka version: 2.4.0  
2020-02-19 22:00:03,630 1878 [main] INFO  o.a.k.common.utils.AppInfoParser - Kafka commitId: 77a89fcf8d7fa018  
2020-02-19 22:00:03,630 1878 [main] INFO  o.a.k.common.utils.AppInfoParser - Kafka startTimeMs: 1582146003626  
2020-02-19 22:00:03,636 1884 [main] INFO  o.a.k.c.consumer.KafkaConsumer - [Consumer clientId=consumer-groupId-1, groupId=groupId] Subscribed to topic(s): f586caf2-ffdc-4e3a-88b9-a262a502f8ac  
2020-02-19 22:00:03,645 1893 [main] INFO  o.s.s.c.ThreadPoolTaskScheduler - Initializing ExecutorService  
2020-02-19 22:00:03,667 1915 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:04,123 2371 [consumer-0-C-1] INFO  org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-groupId-1, groupId=groupId] Cluster ID: hOOJH-WNTNiXD4il0Y7_0Q  
2020-02-19 22:00:05,052 3300 [consumer-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)  
2020-02-19 22:00:05,059 3307 [consumer-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group  
2020-02-19 22:00:05,116 3364 [consumer-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] (Re-)joining group  
2020-02-19 22:00:05,154 3402 [consumer-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Finished assignment for group at generation 1: {consumer-groupId-1-41df9153-7c33-46b1-8274-2d7ee2bfb35c=org.apache.kafka.clients.consumer.ConsumerPartitionAssignor$Assignment@a95df1b} 
2020-02-19 22:00:05,327 3575 [consumer-0-C-1] INFO  o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Successfully joined group with generation 1  
2020-02-19 22:00:05,335 3583 [consumer-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Adding newly assigned partitions: f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0  
2020-02-19 22:00:05,363 3611 [consumer-0-C-1] INFO  o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-groupId-1, groupId=groupId] Found no committed offset for partition f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0  
2020-02-19 22:00:05,401 3649 [consumer-0-C-1] INFO  o.a.k.c.c.i.SubscriptionState - [Consumer clientId=consumer-groupId-1, groupId=groupId] Resetting offset for partition f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0 to offset 0.  
2020-02-19 22:00:05,404 3652 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Committing on assignment: {f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0=OffsetAndMetadata{offset=0, leaderEpoch=null, metadata=''}}  
2020-02-19 22:00:05,432 3680 [consumer-0-C-1] INFO  o.s.k.l.ConcurrentMessageListenerContainer - groupId: partitions assigned: [f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0] 
2020-02-19 22:00:08,669 6917 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:08,670 6918 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:13,671 11919 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:18,673 16921 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:23,674 21922 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:28,676 26924 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:33,677 31925 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:38,678 36926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}  
2020-02-19 22:00:43,678 41926 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Received: 0 records  
2020-02-19 22:00:43,679 41927 [consumer-0-C-1] DEBUG o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer - Commit list: {}

Can anybody tell me where I'm wrong?

Thank you

Angelo

1
Your formatting is very hard to read, why not copy/paste from the log without squashing the lines together; simply put three back-ticks before and after the log. Your configuration looks ok; especially AUTO_OFFSET_RESET_CONFIG, "earliest". That's the most common error. Not related to your problem but - GROUP_ID_CONFIG, "groupId", - I suggest you use a different group.id for each consumer; otherwise you will get unnecessary rebalances. Use console-consumer-groups CLI tool to examine the group offsets for each topic.Gary Russell
@GaryRussell I formatted the logs as you suggested. I'll try the console-consumer-groups and I'll let you know. Thank you :)Angelo Immediata
>Committing on assignment: {f586caf2-ffdc-4e3a-88b9-a262a502f8ac-0=OffsetAndMetadata{offset=0... it looks like the consumer is correctly positioned at the start of the topic/partition; are you sure the topic name is correct and there is actually data there?Gary Russell
About topic name I’m really sure. At this point I absolutely need to check data. Tomorrow morning I’ll give a look... thank you for the supperAngelo Immediata

1 Answers

1
votes

I found the root cause of your code.

Code to send message and log from client side:

ricezioneMailMessageKafkaTemplate.send(rmm.getPk(), rmm);

Inviato messaggio=[RicezioneMailMessage{pk='c5c8f8a4-8ddd-407a-9e51-f6b14d84f304', tipoMessaggio='mail'}] con offset=[0]

Code and log from consumer:

DynamicKafkaConsumer consumer = new DynamicKafkaConsumer(brokerAddress, "f586caf2-ffdc-4e3a-88b9-a262a502f8ac");

2020-02-19 22:00:03,636 1884 [main] INFO  o.a.k.c.consumer.KafkaConsumer - [Consumer clientId=consumer-groupId-1, groupId=groupId] Subscribed to topic(s): f586caf2-ffdc-4e3a-88b9-a262a502f8ac

You are sending to topic: c5c8f8a4-8ddd-407a-9e51-f6b14d84f304

You are listening on topic: f586caf2-ffdc-4e3a-88b9-a262a502f8ac

Producer/consumer are sending/listening on difference topics.