0
votes

I'm interested to use Publisher Confirms in some producers that we have in a project, using Spring Cloud Stream. I have tried doing a small PoC but it is not working. As far as I see in the documentation, this is possible for Asyncrhonous Publisher Confirm, and it should be as easy as do the next changes:

Add in the application.yml the confirmAckChannel and enable the errorChannelEnabled property.

spring.cloud.stream:
  binders:
    rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}
 ....
  bindings:    
    testOutput:
      destination: test
      binder: rabbitDefault
      content-type: application/json    
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
      errorChannelEnabled: true

Then a simple service triggered by an endpoint, where I insert the header related with the errorChannel to the event.

@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders() {
        return withPayload(new Event<>(TestEvent.builder().build()))
                .setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")
                .build();
    }
}

And then the Publisher of RabbitMQ

@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {
        return messagingChannels.test().send(message);
    }
}

Where MessagingChannels is implemented as

public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("errorChannelTest")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

After that, I have implemented 2 listeners, one for errorChannelTest input and the other one for testAck.

@Slf4j
@Component
@RequiredArgsConstructor
class TestErrorListener {

    @StreamListener("errorChannelTest")
    void onCommandReceived(Event<Message> message) {

        log.info("Message error received: " + message);
    }
}
@Slf4j
@Component
@RequiredArgsConstructor
class TestAckListener {

    @StreamListener("testAck")
    void onCommandReceived(Event<Message> message) {

        log.info("Message ACK received: " + message);
    }
}

However, I didn't receive any ACK or NACK for RabbitMQ in these 2 listeners, the event was sent properly to RabbitMQ and manage by the exchange, but then I haven't received any response from RabbitMQ.

Am I missing something? I have checked also with these 2 properties, but it doesn't work as well

spring:
  rabbitmq:
    publisher-confirm-type: CORRELATED
    publisher-returns: true

I'm using Spring-Cloud-Stream 3.0.1.RELEASE and spring-cloud-starter-stream-rabbit 3.0.1.RELEASE

----EDITED------

This is the sample working updated with the recommendations of Gary Russell

Application.yml

spring.cloud.stream:
  binders:
   rabbitDefault:
      defaultCandidate: false
      type: rabbit
      environment.spring.rabbitmq.host: ${spring.rabbitmq.addresses}   
  bindings:   
    testOutput:
      destination: exchange.output.test
      binder: rabbitDefault
      content-type: application/json
    testOutput.producer:
      errorChannelEnabled: true  
  rabbit.bindings:   
    testOutput.producer:
      confirmAckChannel: "testAck"
spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

TestService

@Service
@RequiredArgsConstructor
public class TestService {

    private final TestPublisher testPublisher;

    public void sendMessage() {

        testPublisher.send(addHeaders());
    }

    private Message<Event<TestEvent>> addHeaders(Test test) {
        return withPayload(new Event<>(TestEvent.builder().test(test).build()))
                .build();
    }
}

TestService is triggered by an endpoint in the next simple controller to check this PoC.

@RestController
@RequiredArgsConstructor
public class TestController {

    private final TestService testService;

    @PostMapping("/services/v1/test")
    public ResponseEntity<Object> test(@RequestBody Test test) {

        testService.sendMessage(test);
        return ResponseEntity.ok().build();
    }
}

And then the Publisher of RabbitMQ with both ServiceActivators

@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {

        log.info("Message for Testing Publisher confirms sent: " + message);
        return messagingChannels.test().send(message);
    }

    @ServiceActivator(inputChannel = TEST_ACK)
    public void acks(Message<?> ack) {
        log.info("Message ACK received for Test: " + ack);
    }

    @ServiceActivator(inputChannel = TEST_ERROR)
    public void errors(Message<?> error) {
        log.info("Message error for Test received: " + error);
    }
}

Where MessagingChannels is implemented as

public interface MessagingChannels {

    @Input("testAck")
    MessageChannel testAck();

    @Input("testOutput.errors")
    MessageChannel testError();


    @Output("testOutput")
    MessageChannel test();
}

This is the Main of the application (I have checked with @EnableIntegration too).

@EnableBinding(MessagingChannels.class)
@SpringBootApplication
@EnableScheduling
public class Main {

    public static void main(String[] args) {
        SpringApplication.run(Main.class, args);
    }
}
1

1 Answers

0
votes

testAck should not be a binding; it should be a @ServiceActivator instead.

.setHeader(MessageHeaders.ERROR_CHANNEL, "errorChannelTest")

That won't work in this context; errors are sent to a channel named testOutput.errors; again; this needs a @ServiceActivator, not a binding.

You have errorChannelEnabled in the wrong place; it's a common producer property, not rabbit-specific.

@SpringBootApplication
@EnableBinding(Source.class)
public class So62219823Application {

    public static void main(String[] args) {
        SpringApplication.run(So62219823Application.class, args);
    }

    @InboundChannelAdapter(channel = "output")
    public String source() {
        return "foo";
    }

    @ServiceActivator(inputChannel = "acks")
    public void acks(Message<?> ack) {
        System.out.println("Ack: " + ack);
    }

    @ServiceActivator(inputChannel = "output.errors")
    public void errors(Message<?> error) {
        System.out.println("Error: " + error);
    }

}
spring:
  cloud:
    stream:
      bindings:
        output:
          producer:
            error-channel-enabled: true
      rabbit:
        bindings:
          output:
            producer:
              confirm-ack-channel: acks
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true