1
votes

I have a simple Kafka producer in my spring cloud stream application. As my spring application starts, I have a @PostConstruct method which performs some reconciliation and tries sending events to the Kafka producer.

Issue is, my Kafka Producer is not yet ready when the reconciliation starts sending the enets into it, leading to the below:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'orderbook-service-1.orderbook'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage .. at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)

Is there is a way to get a notification during my application's startup that Kafka channel is initialized, so that I only kick off the rec job post it.

Here is my code snippets:

public interface OrderEventChannel {
    String TOPIC_BINDING = "orderbook";
    @Output(TOPIC_BINDING)
    SubscribableChannel outboundEvent();
}

@Configuration
@EnableBinding({OrderEventChannel.class})
@ConditionalOnExpression("${aix.core.stream.outgoing.kafka.enabled:false}")
public class OutgoingKafkaConfiguration {
}

@Service
public class OutgoingOrderKafkaProducer {

    @Autowired
    private OrderEventChannel orderEventChannel;

   public void onOrderEvent( ClientEvent clientEvent ) {

        try {
            Message<KafkaEvent> kafkaMsg = mapToKafkaMessage( clientEvent );
            SubscribableChannel subscribableChannel = orderEventChannel.outboundEvent();
            subscribableChannel.send( kafkaMsg );
        } catch ( RuntimeException rte ) {
            log.error( "Error while publishing Kafka event [{}]", clientEvent, rte );
        }
    }
..
..

}

1

1 Answers

2
votes

@PostConstruct is MUCH too early in the context lifecycle to start using beans; they are still being created, configured and wired together.

You can use an ApplicationListener (or @EventListener) to listen for an ApplicationReadyEvent (be sure to compare the even's applicationContext to the main application context because you may get other events).

You can also implement SmartLifecycle and put your code in start(); put your bean in a late Phase so it is started after everything is wired up.

Output bindings are started in phase Integer.MIN_VALUE + 1000, input bindings are started in phase Integer.MAX_VALUE - 1000.

So if you want to do something before messages start flowing, use a phase in-between these (e.g. 0, which is the default).