1
votes

I want to use spring-integration-aws to send messages to AWS SNS and to receive messages from AWS SQS. I am having some troubles understanding how to migrate my application from spring-coud-aws-messaging to use it. My code is basically a SQS config class:

@Configuration
@EnableConfigurationProperties(SqsProperties.class)
@Profile("!test")
public class SqsConfiguration {

     private final SqsProperties sqsProperties;  

     @Autowired  
     public SqsConfiguration(SqsProperties sqsProperties) {  
          this.sqsProperties = sqsProperties;  
     }  

     @Bean  
     public SimpleMessageListenerContainer simpleMessageListenerContainer() {  
          SimpleMessageListenerContainer msgListenerContainer =  
                simpleMessageListenerContainerFactory().createSimpleMessageListenerContainer();  
          msgListenerContainer.setMessageHandler(queueMessageHandler());  
          return msgListenerContainer;  
     }  

     @Bean  
     public SimpleMessageListenerContainerFactory simpleMessageListenerContainerFactory() {  
          SimpleMessageListenerContainerFactory msgListenerContainerFactory = new SimpleMessageListenerContainerFactory();  
      msgListenerContainerFactory.setAmazonSqs(amazonSQSClient());  
      msgListenerContainerFactory.setDestinationResolver(customDestinationResolver(sqsProperties));  
     return msgListenerContainerFactory;  
      }  

        @Bean  
      public QueueMessageHandler queueMessageHandler() {  
            QueueMessageHandlerFactory queueMsgHandlerFactory = new QueueMessageHandlerFactory();  
      queueMsgHandlerFactory.setAmazonSqs(amazonSQSClient());  
     return queueMsgHandlerFactory.createQueueMessageHandler();  
      }  


        @Bean(name = "amazonSQS", destroyMethod = "shutdown")  
        public AmazonSQSAsync amazonSQSClient() {  
            return AmazonSQSAsyncClientBuilder.defaultClient();  
      }  

        @Bean  
      public SqsQueuesDestinationResolver customDestinationResolver(SqsProperties sqsProperties) {  
            return new SqsQueuesDestinationResolver(sqsProperties);  
      }  
    }

that uses a customDestinationResolver (basically becasue queue names can be dynamic only due to different deployments) defined as:

    public class SqsQueuesDestinationResolver implements DestinationResolver<String> {  

        private static final Logger LOG = LoggerFactory.getLogger(SqsQueuesDestinationResolver.class);  

     private SqsProperties sqsProperties;  

      @Autowired  
      private AmazonSQSAsync amazonSQSAsync;  

     public SqsQueuesDestinationResolver(SqsProperties sqsProperties) {  
            this.sqsProperties = sqsProperties;  
      }  

        @Override  
      public String resolveDestination(String queueName) throws DestinationResolutionException {  
            String finalQueueName = getFinalQueueName(queueName);  
     try {  
                return amazonSQSAsync.getQueueUrl(finalQueueName).getQueueUrl();  
      }  
            catch (QueueDoesNotExistException queueDoesNotExistException) {  
                LOG.error(String.format("The '%s' queue was not found.", finalQueueName));  
      }  
            return null;  
      }  

        private String getFinalQueueName(String queueName) {  
            String finalQueueName;  
     switch (queueName) {  
                case "queue-foo":  
                    finalQueueName = sqsProperties.getFooQueue();  
     break; case "queue-bar":  
                    finalQueueName = sqsProperties.getBarQueue();  
     break;  
     default:  
                    finalQueueName = null;  
      }  
            return finalQueueName;  
      }  
    }

And that's all: Basically with this configuration I just need to use the annotation @SqsListener("foo-queue") or @SqsListener("bar-queue") in a proper consumer method

@SqsListener("foo-queue")
public void listen(String message){
     processMessage(message);
}

I have been trying to follow the documentation in https://github.com/spring-projects/spring-integration-aws#spring-integrations-extensions-to-aws and what I am struggling to understand in the SQS part is in the "Inbound Channel Adapter" chapter what queues should I give as an argument to the constructor of SqsMessageDrivenChannelAdapter() since I am using a custom destinationResolver, and how exactly do I consume the messages or if it supposed to work with the @SqsListener annotation from spring-cloud-aws-messaging as before.

Tks a lot for the help and if this is not the right place to ask or if it is a very dumb question, I am sorry, I am just trying out this for the first time :)

1

1 Answers

0
votes

I'm not sure what you are missing, but you still can use your foo-queue and bar-queue in the SqsMessageDrivenChannelAdapter(AmazonSQSAsync amazonSqs, String... queues) ctor.

Your custom DestinationResolver indeed can be used over there as well:

public void setDestinationResolver(DestinationResolver<String> destinationResolver) {
    this.simpleMessageListenerContainerFactory.setDestinationResolver(destinationResolver);
}

In fact this SqsMessageDrivenChannelAdapter is fully based on the same SimpleMessageListenerContainerFactory.

To consume messages received frm SQS you just need to configure this channel adapter with a MessageChannel to produce message. Then you subscribe to that channel downstream.

See more about channels and messages in Spring Integration Reference Manual: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/index.html