4
votes

There was a change request on my system, which currently listens to multiple channels and send messages to multiple channels as well, but now the destination names will be in the database and change any time. I'm having trouble believing I'm the first one to come across this, but I see limited information out there.

All I found is these 2...
Dynamic sink destination: https://github.com/spring-cloud-stream-app-starters/router/tree/master/spring-cloud-starter-stream-sink-router, but how would that work to active listening to those channels the way it's done by @StreamListener?

Dynamic source destinations: https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/source-samples/dynamic-destination-source/, which does this

@Bean
    @ServiceActivator(inputChannel = "sourceChannel")
    public ExpressionEvaluatingRouter router() {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id"));
        router.setDefaultOutputChannelName("default-output");
        router.setChannelResolver(resolver);
        return router;
    }

But what's that "payload.id"? And where are the destinations specified there??

1
If this is a custom app, you can use the strategies used in this sample as a blueprint for a solution. github.com/spring-cloud/spring-cloud-stream-samples/tree/master/…sobychacko
I saw that example, but I failed to see how the destination is being passed here @Bean @ServiceActivator(inputChannel = "sourceChannel") public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("payload.id")); router.setDefaultOutputChannelName("default-output"); router.setChannelResolver(resolver); return router; }CCC
Don't put code in comments. It is hard to read. Edit the question instead and comment on the answer that you have done so.Gary Russell
Do you have experience with something like this?? Or know someone who does?CCC

1 Answers

1
votes

Feel free to improve my answer, I hope it will help others.

Now the code (It worked in my debugger). This is an example, not production ready!

This is how to send a message to dynamic destination

import org.springframework.messaging.MessageChannel;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.binding.BinderAwareChannelResolver;


@Service
@EnableBinding
public class MessageSenderService {

    @Autowired
    private BinderAwareChannelResolver resolver;

    @Transactional
    public void sendMessage(final String topicName, final String payload) {
        final MessageChannel messageChannel = resolver.resolveDestination(topicName);
        messageChannel.send(new GenericMessage<String>(payload));
    }
}

And configuration for Spring Cloud Stream.

spring:
  cloud:
    stream:
      dynamicDestinations: output.topic.1,output.topic2,output.topic.3

I found here https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/index.html#dynamicdestination It will work in spring Cloud Stream version 2+. I use 2.1.2

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>2.1.2.RELEASE</version>
</dependency>

This is how to consume a message from dynamic destination

https://stackoverflow.com/a/56148190/4587961

Configuration

spring:
  cloud:
    stream:
      default:
        consumer:
          concurrency: 2
          partitioned: true
      bindings:
        # inputs
        input:
          group: application_name_group
          destination: topic-1,topic-2
          content-type: application/json;charset=UTF-8

Java consumer.

@Component
@EnableBinding(Sink.class)
public class CommonConsumer {

    private final static Logger logger = LoggerFactory.getLogger(CommonConsumer.class);

    @StreamListener(target = Sink.INPUT)
    public void consumeMessage(final Message<Object> message) {
        logger.info("Received a message: \nmessage:\n{}", message.getPayload());

        final String topic = message.getHeaders().get("kafka_receivedTopic");

        // Here I define logic which handles messages depending on message headers and topic.
        // In my case I have configuration which forwards these messages to webhooks, so I need to have mapping topic name -> webhook URI.
    }
}