0
votes

With the latest release of spring-kakfa, we are trying to use request-reply semantics and would like to know if we can use intermediate topics with out losing correlation id. One of our usecase is to receive a message from an api produce it to topic1 and and the result to topic2 and process the message on topic2 and send it to topic3 where topic3 sends the final response to the initial request from topic1.

I am not able to associate the response coming from topic3 to the request on topic1 as the correlationid gets lost in the intermediate topic. I am able to receive the message if I don't use intermediate topic (say topic2) then topic1 sends a message with a correlationid and its corresponding response is received from topic3.

Any suggestions /recommendations are greatly helpful.

Below is sample code: From my API I am posting a transaction

public String postTransaction(String request,Map<String, String> headers) throws InterruptedException, ExecutionException {
    ProducerRecord<String,String> record=new ProducerRecord<String,String>(topic1,"300",request);
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC,topic3.getBytes()));
    RequestReplyFuture<String,String,String> sendAndReceive=kafkaTemplate.sendAndReceive(record);
    SendResult<String,String> requestMessage=sendAndReceive.getSendFuture().get();
    return sendAndReceive.get().value();
}
#

In Another consumer, I am listening on topic1 and taking the correlation id and producing message on topic2 which will send a reply to topic3.

public void listen(Object request, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                   @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
                   @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                   @Header(KafkaHeaders.CORRELATION_ID) byte[] coRlId) throws InterruptedException {

    ProducerRecord<String,String> record=new ProducerRecord<String,String>("topic2","300",k.value());
    record.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID,coRlId.get(0).getBytes()));

    kafkaTemplate.send(record);

}
1
Your intermediate application that reads from topic 2 and writes to topic3 needs to propagate the KafkaHeaders.CORRELATION_ID header from the input record to the output record. - Gary Russell
I tried to propagate KafkaHeaders.CORRELATION_ID from topic2 to topic3 to from the initial request but somehow the id is not matching with the KafkaHeaders.CORRELATION_ID I set in the initial request. and I am getting time out error. Apologies if this is dumb question, I am new to this area, but am I missing something here ? How the KafkaHeaders.CORRELATION_ID gets changed - Ramesh
Unfortunately, they closed this question so I can't add an example in an answer. You must be doing something wrong; if you propagate the correlation id header properly, the caller application will be able to correlate the reply, regardless of how many steps it took. - Gary Russell
I just added sample code , Can you please take a look or add an example. Thank you in advance. - Ramesh

1 Answers

0
votes

I just tested it and it works just fine for me...

@SpringBootApplication
public class So61152047Application {

    public static void main(String[] args) {
        SpringApplication.run(So61152047Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyer(ProducerFactory<String, String> pf,
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, String> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, String, String> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic3");
        container.getContainerProperties().setGroupId("three");
        return container;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf) {
        return new KafkaTemplate<>(pf);
    }


    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> rkt) {
        return args -> {
            ProducerRecord<String, String> pr = new ProducerRecord<>("topic1", "foo", "bar");
            RequestReplyFuture<String, String, String> future = rkt.sendAndReceive(pr);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(String in,
            @Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {

        System.out.println(in);
        ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    @SendTo("topic3")
    public String listen2(String in) {
        return in + in;
    }

}

and

bar
BARBAR

You can also propagate the reply-to header(s)...

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(String in,
            @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
            @Header(KafkaHeaders.CORRELATION_ID) byte[] corrId) {

        System.out.println(in);
        ProducerRecord<String, String> pr = new ProducerRecord<>("topic2", in.toUpperCase());
        pr.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, replyTo));
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, corrId));
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    @SendTo // ("topic3")
    public String listen2(String in) {
        return in + in;
    }

EDIT

To convey the correlation id via the payload:

public class CorrelatingProducerInterceptor implements ProducerInterceptor<String, Foo> {

    @Override
    public void configure(Map<String, ?> configs) {
    }

    @Override
    public ProducerRecord<String, Foo> onSend(ProducerRecord<String, Foo> record) {
        Header correlation = record.headers().lastHeader(KafkaHeaders.CORRELATION_ID);
        if (correlation != null) {
            record.value().setCorrelation(correlation.value());
        }
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }

    @Override
    public void close() {
    }

}
@SpringBootApplication
public class So61152047Application {

    public static void main(String[] args) {
        SpringApplication.run(So61152047Application.class, args);
    }

    @Autowired
    private KafkaTemplate<String, Foo> kafkaTemplate;

    @Bean
    public ReplyingKafkaTemplate<String, Foo, Foo> replyer(ProducerFactory<String, Foo> pf,
            ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {

        containerFactory.setReplyTemplate(kafkaTemplate(pf));
        ConcurrentMessageListenerContainer<String, Foo> container = replyContainer(containerFactory);
        ReplyingKafkaTemplate<String, Foo, Foo> replyer = new ReplyingKafkaTemplate<>(pf, container);
        return replyer;
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, Foo> replyContainer(
            ConcurrentKafkaListenerContainerFactory<String, Foo> containerFactory) {

        ConcurrentMessageListenerContainer<String, Foo> container = containerFactory.createContainer("topic3");
        container.getContainerProperties().setGroupId("three");
        return container;
    }

    @Bean
    public KafkaTemplate<String, Foo> kafkaTemplate(ProducerFactory<String, Foo> pf) {
        return new KafkaTemplate<>(pf);
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, Foo, Foo> rkt) {
        return args -> {
            ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic1", "foo", new Foo("bar"));
            RequestReplyFuture<String, Foo, Foo> future = rkt.sendAndReceive(pr);
            System.out.println(future.get(10, TimeUnit.SECONDS).value());
        };
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("topic1").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("topic2").partitions(1).replicas(1).build();
    }

    @Bean
    public NewTopic topic3() {
        return TopicBuilder.name("topic3").partitions(1).replicas(1).build();
    }

    @KafkaListener(id = "one", topics = "topic1")
    public void listen1(Foo in) {

        System.out.println(in);
        in.setContent(in.getContent().toUpperCase());
        ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic2", in);
        this.kafkaTemplate.send(pr);
    }

    @KafkaListener(id = "two", topics = "topic2")
    public void listen2(Foo in) {
        ProducerRecord<String, Foo> pr = new ProducerRecord<>("topic3", new Foo(in.getContent() + in.getContent()));
        pr.headers().add(new RecordHeader(KafkaHeaders.CORRELATION_ID, in.getCorrelation()));
        this.kafkaTemplate.send(pr);
    }

}

class Foo {

    String content;

    byte[] correlation;

    public Foo() {
    }

    public Foo(String content) {
        this.content = content;
    }

    public String getContent() {
        return this.content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public byte[] getCorrelation() {
        return this.correlation;
    }

    public void setCorrelation(byte[] correlation) {
        this.correlation = correlation;
    }

    @Override
    public String toString() {
        return "Foo [content=" + this.content + "]";
    }

}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Foo

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.interceptor.classes=com.example.demo.CorrelatingProducerInterceptor

and

Foo [content=bar]
Foo [content=BARBAR]

Of course, the intermediate app needs to pass the correlation id, even when it's in the payload.