1
votes

I read many articles but didn't found how to configure Producer which have topic with multiple partition (topic created at runtime) using Spring Integration Kafka.

I am using github link to understand and configure kafka for my application.

Kindly provide the solution for the same

One more thing, What the is use of kafkaHeader.messageKey.

Update : Below is test code which I develop
` public class OutboundTests { @Test public void mytest() throws Exception{

    KafkaProducerContext<String, SMSNotificationVO> ctx = new KafkaProducerContext<String, SMSNotificationVO>();
    ProducerMetadata<String, SMSNotificationVO> meta = new ProducerMetadata<String, SMSNotificationVO>("sms_topic");
    meta.setValueClassType(SMSNotificationVO.class);
    meta.setKeyClassType(String.class);
    meta.setValueEncoder(new SMSObjectSerializer());

    ProducerMetadata<String, SMSNotificationVO> meta1 = new ProducerMetadata<String, SMSNotificationVO>("sms_topic_10");
    meta1.setValueClassType(SMSNotificationVO.class);
    meta1.setKeyClassType(String.class);
    meta1.setValueEncoder(new SMSObjectSerializer());
    Properties producerProperties = new Properties();
    producerProperties.put(org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG, "1");//"message.send.max.retries"
    ProducerFactoryBean<String, SMSNotificationVO> producer = new ProducerFactoryBean<String, SMSNotificationVO>(meta,"192.168.1.147:9092");
    ProducerFactoryBean<String, SMSNotificationVO> producer1 = new ProducerFactoryBean<String, SMSNotificationVO>(meta1,"192.168.1.147:9092", producerProperties);


    ProducerConfiguration<String, SMSNotificationVO> p = new ProducerConfiguration<String, SMSNotificationVO>(meta, producer.getObject());
    ProducerConfiguration<String, SMSNotificationVO> p1 = new ProducerConfiguration<String, SMSNotificationVO>(meta1, producer1.getObject());

    Map<String, ProducerConfiguration<String, SMSNotificationVO>> map = new HashMap<String, ProducerConfiguration<String, SMSNotificationVO>>();
    map.put("sms_topic", p);
    map.put("sms_topic_10", p1);

    ctx.setProducerConfigurations(map);

    // java code to send message
    Map<String, String> params = new HashMap<>();
    params.put("Topic", "TEST MULTIPLE TOPIC : this is sms_topic_1");
    List<String> smsRecipients = new ArrayList<>();
    smsRecipients.add("9953225211");


     String message = "This Test message from junit topic sms_topic_1";
     Integer messageType =1;

    SMSNotificationVO vo = new SMSNotificationVO();
    vo.setMessage(message);
    vo.setMessageType(messageType);
    vo.setParams(params);
    vo.setSmsRecipients(smsRecipients);


    try{
     KafkaProducerMessageHandler<String, SMSNotificationVO> handler = new KafkaProducerMessageHandler<String, SMSNotificationVO>(ctx);
     handler.setMessageKeyExpression(new LiteralExpression("sms_topic_10"));
     handler.handleMessage(MessageBuilder.withPayload(vo)
             //if i remove the below comments I will get null pointer exception
             //.setHeader(KafkaHeaders.MESSAGE_KEY, "some key")
             //.setHeader(KafkaHeaders.PARTITION_ID, "1")
                .setHeader(KafkaHeaders.TOPIC, "sms_topic_10")
                .build());

    }catch(Exception e){
        e.printStackTrace();
        Assert.fail("Kafka SMS Producer fail");
    }

}  
}`

I am getting Null pointer Exception. Below is the mention log:

`org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler@7b94089b]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at dd.kafka.producer.OutboundTests.mytest(OutboundTests.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:675)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)
Caused by: java.lang.NullPointerException at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:130)
at kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:127)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
at kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:127)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:53)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at org.springframework.integration.kafka.support.ProducerConfiguration.send(ProducerConfiguration.java:70)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:197)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:81)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 24 more`

Thanks

1

1 Answers

1
votes

Topic isn't related to the Producer. You should have pre-configured Topic in advance before send or receive message to/from it.

The fact that you can create the topic at runtime using AdminUtils.createTopic() over the Spring Integration high-level API is just for testing convenience and should be avoided for the production.

The KafkaHeaders.messageKey is fully mapped to the standard Kafka messageKey and yes can be used to determine the target partition in the topic.

So, you definitely should go to the official Apache Kafka documentation to understand how to create topic with "multiple partition" and what to do from the Producer side regarding partition and messageKey parameters.