0
votes

I am using spring cloud stream for messaging. In the consumer part, I used IntegrationFlow to listen to the queue. It is listening and printing the message from producer side. But the format is different that's the problem I am facing now. The content-type of the producer is application/json and the IntegrationFLow message payload showing ASCII numbers. The code is written for the consumer is given below

 @EnableBinding(UserOperationConsume.class)
 public class ConsumerController {

   @Bean
   IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
     return IntegrationFlows
     .from(u.userRegistraionProduces())
     //.transform(Transformers.toJson()) // not working as expected
     //.transform(Transformers.fromJson(UserDTO.class))
     .handle(String.class, (payload, headers) -> {
     System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118... 
     return null;
     }).get();
  }

 }

The input interface is,

 public interface UserOperationConsume {
  @Input
  public SubscribableChannel userRegistraionProduces();
 }

And the consumer yml configuration is,

 server:
   port: 8181

 spring:
   application:
   name: nets-alert-service
 ---
 spring:
   cloud:
     config:
       name: notification-service
       uri: http://localhost:8888
 ---    
 spring:
   rabbitmq:
     host: localhost
     port: 5672
     username: guest
     password: guest

   ---
   spring:
     cloud:
       stream:
         bindings:
           userRegistraionProduces:
             destination: userOperations
         input:
           content-type: application/json

I have tried Sink.class binding, That time I got an exact message from the queue. So please let me know if there is any mistake in this IntegrationFlow configuration. Because I am a newbie in spring cloud stream and IntegrationFlow. is there any way to transform this ascii to exact string? Thanks in advance

1

1 Answers

2
votes

Using IntegrationFlows.from(channel) provides no conversion hints so you just get the raw byte[] payload (containing JSON). It's not clear why you then use a toJson() transformer.

Your .handle(String.class, (payload, headers) -> {... is causing a simple ArrayToStringConverter to be used which is why you are seeing each byte value.

In any case, you are not using the framework properly. Use...

@StreamListener("userRegistraionProduces")
public void listen(UserDTO dto) {
    System.out.println(dto);
}

...and the framework will take care of the conversion for you. Or...

@StreamListener("userRegistraionProduces")
public void listen(Message<UserDTO> dtoMessage) {
    System.out.println(dtoMessage);
}

if your producer conveys additional information in headers.

EDIT

If you prefer to do the conversion yourself, this works fine...

@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
    return IntegrationFlows.from(u.userRegistraionProduces())
            .transform(Transformers.fromJson(UserDTO.class))
            .handle((payload, headers) -> {
                System.out.println(payload.toString());
                return null;
            }).get();
}

...since the Json transformer can read a byte[].