2
votes

I have written an @Aspect to intercept Reactive Methods that return values in Mono/Flux. Using @AfterReturning advice, i'm trying to fire an APNS notification by calling a webservice.

unfortunately the processNotification Mono services is immediately returning onComplete signal without executing the chain of calls. Below is my sample program.

@Aspect
@Component
@Slf4j
public class NotifyAspect{
    private final NotificationServiceHelper notificationServiceHelper;

    @Autowired
    public NotifyAspect(NotificationServiceHelper notificationServiceHelper) {
        this.notificationServiceHelper = notificationServiceHelper;
    }

    @AfterReturning(pointcut = "@annotation(com.cupid9.api.common.annotations.Notify)", returning = "returnValue")
    public void generateNotification(JoinPoint joinPoint, Object returnValue) throws Throwable {
        log.info("AfterReturning Advice - Intercepting Method : {}", joinPoint.getSignature().getName());

        //Get Intercepted method details.
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        //Get the Notification Details.
        Notify myNotify = method.getAnnotation(Notify.class);
        if (Mono.class.isAssignableFrom(returnValue.getClass())) {
            Mono<Object> result = (Mono<Object>) returnValue;
            result.doOnSubscribe(o -> {
                log.debug("On Subscription...");
                notificationServiceHelper.processNotification(myNotify.notificationType())
                .doOnError(throwable -> {
                    log.error("Exception in notification processor",throwable);
                });
            });
        }
    }

}
@Slf4j
@Service
public class NotificationServiceHelper {
    private ReactiveUserProfileRepository userProfileRepository;

    @Value("${services.notification.url}")
    private String notificationServiceUrl;

    private RestWebClient restWebClient;

    @Autowired
    public NotificationServiceHelper(RestWebClient restWebClient,
                                     ReactiveUserProfileRepository reactiveUserProfileRepository) {
        this.restWebClient = restWebClient;
        this.userProfileRepository = reactiveUserProfileRepository;
    }

    public Flux<Notification> processNotification(NotificationSchema.NotificationType notificationType) {
        /*Get user profile details*/
        return SessionHelper.getProfileId()
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 1!")))
                .flatMap(profileId ->
                        Mono.zip(userProfileRepository.findByIdAndStatus(profileId, Status.Active), SessionHelper.getJwtToken()))
                .switchIfEmpty( Mono.error(new BadRequest("Invalid Account 2!")))
                .flatMapMany(tuple2 ->{
                    //Get user details and make sure there are some valid devices associated.
                    var userProfileSchema = tuple2.getT1();
                    log.info("Processing Notifications for User Profile : {}", userProfileSchema.getId());
                    if (Objects.isNull(userProfileSchema.getDevices()) || (userProfileSchema.getDevices().size() < 1)) {
                        return Flux.error(new InternalServerError("No Devices associate with this user. Can not send notifications."));
                    }

                    //Build Notification message from the Notification Type
                    var notificationsMap = new LinkedHashSet<Notification>();
                    userProfileSchema.getDevices().forEach(device -> {
                        var notificationPayload = Notification.builder()
                                .notificationType(notificationType)
                                .receiverDevice(device)
                                .receiverProfileRef(userProfileSchema.getId())
                                .build();
                        notificationsMap.add(notificationPayload);
                    });

                    //Get session token for authorization
                    var jwtToken = tuple2.getT2();

                    //Build the URI needed to make the rest call.
                    var uri = UriComponentsBuilder.fromUriString(notificationServiceUrl).build().toUri();
                    log.info("URI built String : {}", uri.toString());

                    //Build the Headers needed to make the rest call.
                    var headers = new HttpHeaders();
                    headers.add(HttpHeaders.ACCEPT, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
                    headers.add(HttpHeaders.AUTHORIZATION, jwtToken);

                    var publishers = new ArrayList<Mono<ClientResponse>>();
                    notificationsMap.forEach(notification -> {
                        publishers.add(restWebClient.post(uri, headers, notification));
                    });
                    return Flux.merge(publishers).flatMap(clientResponse -> {
                        var httpStatus = clientResponse.statusCode();
                        log.info("NotificationService HTTP status code : {}", httpStatus.value());
                        if (httpStatus.is2xxSuccessful()) {
                            log.info("Successfully received response from Notification Service...");
                            return clientResponse.bodyToMono(Notification.class);
                        } else {
                            // return Flux.empty();
                            return clientResponse.bodyToMono(Error.class)
                                    .flatMap(error -> {
                                        log.error("Error calling Notification Service :{}", httpStatus.getReasonPhrase());
                                        if (httpStatus.value() == 400) {
                                            return Mono.error(new BadRequest(error.getMessage()));
                                        }
                                        return Mono.error(new InternalServerError(String.format("Error calling Notification Service : %s", error.getMessage())));
                                    });
                        }
                    });
                }).doOnError(throwable -> {
                    throw new InternalServerError(throwable.getMessage(), throwable);
                });
    }

}

How can we trigger this call in async without making the interception wait.. right now processNotification is always returning onComplete signal without executing. The chain is not executing as expected

1
Your aspect tries to intercept methods annotated by @Notify but your service is not annotated accordingly. Why do you expect it to be intercepted by the aspect?kriegaex
Can you elaborate on what you meant by "service is not annotated accordingly"? I'm trying to send a APN notification when the API completes successfully. I use the return value from API service annotated with @Notify to trigger the APN service logic which is not working. if you can share a simple code that helps me use the return value which is of type MONO or FLUX and trigger another autowired services which use reactive completionAhamed
I call the NotificationServiceHelper class in NotifyAspect class which is annotated with @Aspect where the @AfterReturning advice will be executed when the aspect intercept the pointcut annotatedAhamed
What I mean is that your processNotification method does not have a @Notify annotation, so it will never be intercepted by the aspect.kriegaex

1 Answers

1
votes
@Target({ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Log {

     public String title() default "";

}


@SuppressWarnings({"unchecked"})
@Around("@annotation(operlog)")
public Mono<Result> doAround(ProceedingJoinPoint joinPoint, Log operlog) {
    Mono<Result> mono;

    try {
        mono = (Mono<Result>) joinPoint.proceed();
    } catch (Throwable throwable) {
        throw new RuntimeException(throwable);
    }

    return mono.doOnNext(result -> {
                //doSomething(result);
            };

}