1
votes

I'm using the Kafka example from GitHub (https://github.com/marinkobabic/axon-kafka-example) and it's working when connecting to a locally deployed Kafka cluster. As suggested by Axon's docs, I'm trying to use a custom kafka ProducerFactory by exposing a KafkaPublisher bean and overriding withProducerFactory(). My bean is created, but createProducer() of my custom kafka ProducerFactory is never getting called. In fact, Axon's DefaultProducerFactory is still used. Any suggestions?

@Bean
KafkaPublisher<byte[], byte[]> kafkaPublisher(ProducerFactory factory) {
    KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
        .withMessageSource(new SimpleEventBus())
        .withProducerFactory(new org.axonframework.kafka.eventhandling.producer.ProducerFactory<String, byte[]>() {
            @Override
            public Producer<String, byte[]> createProducer() {
                return factory.createProducer();
            }

            @Override
            public void shutDown() {
            }
        })
        .withTopic(topic)
        .build();
    KafkaPublisher<byte[], byte[]> publisher = new KafkaPublisher<>(configuration);
    publisher.start();
    return publisher;
}
1
Hi @blackcompe, which documentation are you referencing towards? Axon's Kafka section, or documentation tied to the sample from Marinko? Added, the Kafka-Extension and Axon's documentation in this part uses a DefaultProducerFactory, whilst you're creating an anonymous class in this snippet.Steven
I see you've resolved the problem at hand already, great job!Steven

1 Answers

2
votes

I was able to solve my problem by keeping the KafkaPublisher, but I needed to expose an EventBus bean (which I qualified in Sender.java) and wire up the publisher to it. I also needed to expose my own axon ProducerFactory that wrapped my own kafka ProducerFactory. My final configuration is as follows:

@Configuration
@AutoConfigureBefore(KafkaAutoConfiguration.class)
class AxonConfig {

    @Value("${axon.kafka.default-topic}")
    private String topic;

    @Bean
    public org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]> producerFactory(ProducerFactory factory) {
        return new org.axonframework.kafka.eventhandling.producer.ProducerFactory<byte[], byte[]>() {
            @Override
            public Producer<byte[], byte[]> createProducer() {
                return factory.createProducer();
            }

            @Override
            public void shutDown() {
            }
        };
    }

    @Bean("event-bus")
    EventBus eventBus() {
        return new SimpleEventBus();
    }

    @Bean
    KafkaPublisher<byte[], byte[]> kafkaPublisher(org.axonframework.kafka.eventhandling.producer.ProducerFactory factory, EventBus eventBus) {
        KafkaPublisherConfiguration configuration = KafkaPublisherConfiguration.<String, byte[]>builder()
            .withMessageSource(eventBus)
            .withProducerFactory(factory)
            .withTopic(topic)
            .build();
        MyKafkaPublisher<byte[], byte[]> publisher = new MyKafkaPublisher<>(configuration);
        eventBus.subscribe((events) -> publisher.sendEvents(events));
        publisher.start();
        return publisher;
    }
}