1
votes

I am integration testing a spring integration flow, which starts with a Mail inbound channel adapter. I send test emails to a mock GreenMail email server and then test the expected outcome. But because the email is asynchronous, the test currently only passes if I wait after sending the mail, until the flow completes.

Here is the mail adapter config:

<int-mail:inbound-channel-adapter id="imapAdapter"
      store-uri="#{mailConnectionString}"
      java-mail-properties="javaMailProperties" channel="inboundChannel"
      should-delete-messages="false" should-mark-messages-as-read="true"
      auto-startup="true">
    <int:poller id="emailPoller" max-messages-per-poll="1" fixed-rate="5000">
    </int:poller>
</int-mail:inbound-channel-adapter>

So, with reference to this: Adding Completion Advice, I thought I could simply wait for the completion advice and then continue testing. But you can't add advice to the mail adapter:

Caused by: org.springframework.beans.NotWritablePropertyException: Invalid property 'adviceChain' of bean class [org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean]: Bean property 'adviceChain' is not writable or has an invalid setter method. Does the parameter type of the setter match the return type of the getter?

Also tried the reply producing handler (from above link), but no bean was found.

So. How do I add advice to the inbound mail adapter? Or is there a better way to test the mail adapter once the entire flow completes?

Update after answer suggestion I changed the test to add the advice within the set up.

@Autowired
private SourcePollingChannelAdapter emailAdapter;

private MyAdvice imapAdapterCompletionAdvice;

@Before
public void setup() throws Exception
{
    imapAdapterCompletionAdvice = new MyAdvice();
    List<Advice> theAdvice = new ArrayList<Advice>();
    theAdvice.add(imapAdapterCompletionAdvice);
    emailAdapter.setAdviceChain(theAdvice);
    emailAdapter.start();           
} 

But the advice is not called. Am I missing something?

Here is the Advice class:

public class MyAdvice implements MethodInterceptor {

    private final CountDownLatch latch = new CountDownLatch(1);

    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object proceed = invocation.proceed();
        System.out.println(proceed);
        if (proceed instanceof Boolean) {
            Boolean mailReceived = (Boolean) proceed;
            if(mailReceived){
                latch.countDown();
            }
        }

        return proceed;
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}
1

1 Answers

1
votes

I am not entirely clear about what you are trying to do but inbound channel adapters are not message handlers (hence there is no handler bean). Polled inbound adapters are MessageSource but advising the source won't help because we invoke receive() and then send the message to the flow.

You can, however, add Advice objects to the <poller/>'s advice-chain.

An around advice would cover both the receive() (from the source) and send() to the channel so you can suspend the thread there.

You can just configure the advice on the poller directly. If you want to do it programmatically, the adviceChain property is on the pollerMetadata field of the factory bean.

EDIT

I wouldn't use a BFPP for this any more - that question/answer is old; we now expose the handler as a bean name id.handler (and the messsage source similarly) so a BFPP is no longer needed.

It's much easier to wait for the bean to have been created rather than trying to inject the property into the bean definition.

I would do something like this...

  1. Set auto-startup to false in the test case (use a property placeholder so it's true for production, false for test).
  2. Inject the advice chain.
  3. Start the adapter...

.

@Autowired
private SourcePollingChannelAdapter adapter;

...

@Test
public ... {

    this.adapter.setAdviceChain(...);
    this.adapter.start();
    ...
}

If you don't want to use this technique, use a BeanPostProcessor (postProcessAfterInitialization - after the factory bean has served up the channel adapter) rather than a BeanFactoryPostProcessor to modify the advice chain.

You are correct in that the advice will be invoked even when there is no result from the poll.

You can use another advice (subclass of AbstractMessageSourceAdvice - see Smart Polling.

This advice advises just the receive() method and can tell if the result of the poll is a message; it can then arm your other advice to fire after the message is processed.

EDIT2

Need to reset the initialized flag so that the advice(s) will be re-applied. This can be done using reflection (Spring has a convenient DirectFieldAccessor).

If you're not comfortable with using reflection, you can start/stop/start which will do the same thing, but we need to make sure the first start doesn't actually trigger a poll.

Example with reflection:

@Autowired
private SourcePollingChannelAdapter adapter;

@Test
public void testAdvice() throws Exception {
    List<Advice> adviceChain = new ArrayList<Advice>();
    final AtomicBoolean hasMessage = new AtomicBoolean();
    final CountDownLatch latch = new CountDownLatch(1);
    class MessageDetector extends AbstractMessageSourceAdvice {

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            hasMessage.set(result != null);
            System.out.println("has message:" + hasMessage.get());
            return result;
        }

    }
    adviceChain.add(new MessageDetector());
    class MyAdvice implements MethodInterceptor {

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            System.out.println("in myAdvice before, hasmessage:" + hasMessage.get());
            Object proceed = invocation.proceed();
            System.out.println("in myAdvice after, hasmessage:" + hasMessage.get());
            latch.countDown();
            return proceed;
        }

    }
    adviceChain.add(new MyAdvice());
    adapter.setAdviceChain(adviceChain);
    new DirectFieldAccessor(adapter).setPropertyValue("initialized", false);
    adapter.start();
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}

Example with trigger manipulation...

@Autowired
private SourcePollingChannelAdapter adapter;

@Test
public void testAdvice() throws Exception {
    List<Advice> adviceChain = new ArrayList<Advice>();
    final AtomicBoolean hasMessage = new AtomicBoolean();
    final CountDownLatch latch = new CountDownLatch(1);
    class MessageDetector extends AbstractMessageSourceAdvice {

        @Override
        public boolean beforeReceive(MessageSource<?> source) {
            return true;
        }

        @Override
        public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
            hasMessage.set(result != null);
            System.out.println("has message:" + hasMessage.get());
            return result;
        }

    }
    adviceChain.add(new MessageDetector());
    class MyAdvice implements MethodInterceptor {

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            System.out.println("in myAdvice before, hasmessage:" + hasMessage.get());
            Object proceed = invocation.proceed();
            System.out.println("in myAdvice after, hasmessage:" + hasMessage.get());
            latch.countDown();
            return proceed;
        }

    }
    adviceChain.add(new MyAdvice());
    adapter.setAdviceChain(adviceChain);
    adapter.setTrigger(new Trigger() {

        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            return null; // never poll
        }
    });
    adapter.start();
    adapter.stop();
    adapter.setTrigger(new PeriodicTrigger(1000));
    adapter.start();
    assertTrue(latch.await(10, TimeUnit.SECONDS));
}