1
votes

I'm developing a spring integration app which has a Kafka outbound channel adaptor and configuring the flows using spring integration java dsl.

  • spring-integration-core : 4.2.4.RELEASE
  • spring-integration-kafka :1.3.0.RELEASE
  • spring-integration-java-dsl : 1.1.2.RELEASE

I have configured the message handler spec similar to the following snippet.

            KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
                .outboundChannelAdapter()
                .addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
                        new StringSerializer(), new ByteArraySerializer()), "localhost:9092");

I wish to add a ProducerListener. This capability was added to spring integration kafka here. https://github.com/spring-projects/spring-integration-kafka/pull/80

Can you please provide with me with the appropriate mechanism to add the ProducerListener using the Java DSL.

Thanks.

1

1 Answers

0
votes

Well, this sound like a good improvement to SI Java DSL for the SI Kafka 1.3. Right now we have a compatibility there with the SI Kafka 1.2.x.

Meanwhile we will try to figure out the fix (feel free to raise an appropriate GH issue), here is a workaround for you:

KafkaProducerMessageHandlerSpec messageHandlerSpec = Kafka
            .outboundChannelAdapter()
            .addProducer(new ProducerMetadata<String, byte[]>(topicName, String.class, byte[].class,
                    new StringSerializer(), new ByteArraySerializer()), "localhost:9092");

KafkaProducerContext kafkaProducerContext = (KafkaProducerContext) messageHandlerSpec.getComponentsToRegister().iterator().next();
ProducerConfiguration<?, ?> producerConfiguration = kafkaProducerContext.getTopicConfiguration(topicName);
producerConfiguration.setProducerListener(myProducerListener);