1
votes

I have a aggregator utility class, where i have to joint more than one cassandra table data. my production code will looks like below but not exactly same.

@Autowired FollowersRepository followersRepository;
    @Autowired TopicRepository topicRepository;
        @GetMapping("/info")
        public Flux<FullDetails> getData(){
            return Flux.create(emitter ->{
                followersRepository.findAll() 
                .doOnNext(data -> {
                    List<String> all = data.getTopiclist(); //will get list of topic id
                    List<Alltopics> processedList = new ArrayList<Alltopics>();
                    all.forEach(action -> {
                        topicRepository.findById(action) //will get full detail about topic
                        .doOnSuccess(topic ->{
                            processedList.add(topic);
                            if (processedList.size() >= all.size()) {
                                FullDetails fulldetails = new FullDetails(action,processedList);
                                emitter.next(fulldetails);
                                //emitter.complete();
                            }
                        })
                        .subscribe();
                    });
                })
                .doOnComplete(() ->{
                    System.out.println("All the data are processed !!!");
                    //emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
                })
                .subscribe();
            });
        }

For more details, refer the code here CodeLink.

I have tried with doOnComplete and doOnFinally for outer Flux, it is not waiting for all inner Non-blocking calls to complete.

I want to call onComplete, after processing all the nested Mono/Flux(non-blocking) request inside Flux.

For nested blocking flux/mono, the outer flux doOnComplete method is executing after completion of inner Flux/Mono.


PostScript(PS):-

In below example, i am not able find where to place emitter.complete(). because doOnComplete() method is called before completion of all the inner Mono.

Request Body:-

[{ "content":"Intro to React and operators", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Flux", "author":"Josh Long", "name":"Spring WebFlux" },{ "content":"Intro to Mono", "author":"Josh Long", "name":"Spring WebFlux" }] 

My Rest Controller:-

@PostMapping("/topics")
    public Flux<?> loadTopic(@RequestBody Flux<Alltopics> data)
    {
        return Flux.create(emitter ->{
            data
            .map(topic -> {
                topic.setTopicid(null ==topic.getTopicid() || topic.getTopicid().isEmpty()?UUID.randomUUID().toString():topic.getTopicid());
                return topic;
            })
            .doOnNext(topic -> {
                topicRepository.save(topic).doOnSuccess(persistedTopic ->{
                    emitter.next(persistedTopic);
                    //emitter.complete();
                }).subscribe();
            })
            .doOnComplete(() -> {
                //emitter.complete();
                System.out.println(" all the data are processed!!!");
            }).subscribe();
        });
    }
1
I'd like to help here, but even by looking at your code repository, I don't understand the current code snippet. It seems this code snippet is sticking the topicId as the userId in FullDetails.Brian Clozel
thanks @BrianClozel, it should be data.getUserid() instead of action. The problem, i am facing is to have some logic to identify all the non-blocking Mono/Flux are completed for given outer Flux. like in imperative style size check of list.Prabu Subra
The way this question is set up still doesn’t help. One has to check out your code repository and try things in an IDE to understand what you’re trying to achieve. Hardly useful for other community members.Brian Clozel
ok, i will try to re frame in different way.Prabu Subra
Thanks, I’ll take another look!Brian Clozel

1 Answers

2
votes

Here are a few rules that you should follow when writing a reactive pipeline:

  1. doOnXYZ operators should never be used to do lots of I/O, latency involved operations or any reactive operation. Those should be used for "side-effects" operations, such as logging, metrics and so on.
  2. you should never subscribe from within a pipeline or a method that returns a reactive type. This decouples the processing of this operation from the main pipeline, meaning there's no guarantee you'll get the expected result at the right time nor that the complete/error signals will be known to your application.
  3. you should never block from within a pipeline or a method that returns a reactive type. This will create critical issues to your application at runtime.

Now because your code snippet is quite convoluted, I'll just give you the general direction to follow with another code snippet.

@GetMapping("/info")
public Flux<FullDetails> getData(){
    return followersRepository.findAll()
        .flatMap(follower -> {
            Mono<List<Alltopics>> topics = topicRepository.findAllById(follower.getTopiclist()).collectList();
            return topics.map(topiclist -> new FullDetails(follower.getId(), topiclist));
        });
}