0
votes

I am trying to perform the following actions

  1. Aggregating messages
  2. Launching Spring Cloud Task But not able to pass the aggregated message to the method launching Task. Below is the piece of code
    @Autowired
    private TaskProcessorProperties processorProperties;

    @Autowired
    Processor processor;

    @Autowired
    private AppConfiguration  appConfiguration ;


    @Transformer(inputChannel = MyProcessor.intermidiate, outputChannel = Processor.OUTPUT)
    public Object setupRequest(String message) {
        Map<String, String> properties = new HashMap<>();

        if (StringUtils.hasText(this.processorProperties.getDataSourceUrl())) {
            properties.put("spring_datasource_url", this.processorProperties.getDataSourceUrl());
        }
        if (StringUtils.hasText(this.processorProperties.getDataSourceDriverClassName())) {
            properties.put("spring_datasource_driverClassName", this.processorProperties
                    .getDataSourceDriverClassName());
        }
        if (StringUtils.hasText(this.processorProperties.getDataSourceUserName())) {
            properties.put("spring_datasource_username", this.processorProperties
                    .getDataSourceUserName());
        }
        if (StringUtils.hasText(this.processorProperties.getDataSourcePassword())) {
            properties.put("spring_datasource_password", this.processorProperties
                    .getDataSourcePassword());
        }
        properties.put("payload", message);

        TaskLaunchRequest request = new TaskLaunchRequest(
                this.processorProperties.getUri(), null, properties, null,
                this.processorProperties.getApplicationName());
        System.out.println("inside task launcher  **************************");
        System.out.println(request.toString() +"**************************");
        return new GenericMessage<>(request);
    }


    @ServiceActivator(inputChannel = Processor.INPUT,outputChannel = MyProcessor.intermidiate)
    @Bean
    public MessageHandler aggregator() {
        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));
        AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        //aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }

To pass the message between aggregator and task launcher method (setupRequest(String message)) , i am using a channel MyProcessor.intermidiate defined as below

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Indexed;


public interface MyProcessor {
    String intermidiate = "intermidiate";

    @Output("intermidiate")
    MessageChannel intermidiate();

}

Applicaion.properties used is below

aggregator.message-store-type=persistentMessageStore
spring.cloud.stream.bindings.input.destination=output
spring.cloud.stream.bindings.output.destination=input

Its not working , With the above mentioned approach .

In this class if i change the channel name from my defined channel MyProcessor.intermediate to Processor.input or Processor.output than any one of the things works (based on the channel name changed to Processor.*)

I want to aggregate the messages first and than want to launch task on aggragated messages in processor, which is not happening

1

1 Answers

0
votes

See here:

public Object setupRequest(String message) {

So, you expect some string as a request payload.

Your AggregatorFactoryBean use a DefaultAggregatingMessageGroupProcessor, which does exactly this:

    List<Object> payloads = new ArrayList<Object>(messages.size());
    for (Message<?> message : messages) {
        payloads.add(message.getPayload());
    }
    return payloads;

So, it is definitely not a String.

It is strange that you don't show what exception happens with your configuration, but I assume you need to change setupRequest() signature to expect a List of payloads or you need to provide some custom MessageGroupProcessor to build that String from the group of messages you have aggregated.