1
votes

I'm trying to consume multiple Messages from different PubSub Topics in the same Application. One of them is polled the others should be functional bindings. My functional consumers dont work.

@Bean
public Consumer<MessageA> messageAMessageHandler() {
    return message -> { ... }
}

@Scheduled(initialDelay = 60 * 1000, fixedRate = 600 * 1000)
public void pollBMessages() {
    this.bMessageSource.b().poll(m -> { ... }, new ParameterizedTypeReference<MessageB>() {
    });
}

My application.yml:

spring:
  cloud:

    stream:
      pubsub:
        default:
          consumer:
            auto-create-resources: true

      gcp:
        pubsub:
          bindings:
            message-a-input:
              consumer:
                ack-mode: manual

      bindings:
        messageAMessageHandler-in-0:
          destination: message-b-topic
          group: my-service
        message-a-input:
          destination: message-a-topic
          group: my-service

    function:
      definition: messageAMessageHandler;messageCMessageHandler

Polling the annotation based MessageSource works well, but the functional bindings are not picked up. They are Beans in my application context, but cloud stream or gcp pubsub ignores them. no subscriptions are created and no messages consumed.

What am I missing?

1

1 Answers

3
votes

Since your polled binder is working, you probably have the @EnabledBinding annotation somewhere in your code. If you look at the application output, you'll see a message like this:

onConfiguration$FunctionBindingRegistrar : Functional binding is disabled due to the presense of @EnableBinding annotation in your configuration.

Spring Cloud Stream does not support mixing legacy (annotation-based) and functional binding styles.