2
votes

Assume I have below Flux and Mono nested with it. I have a information on two different Apache Cassandra tables. i want to merge the details and send back to as Flux.

Refer the updated pseudo-code below.

@Autowired FollowersRepository followersRepository;
@Autowired TopicRepository topicRepository;
    @GetMapping("/info")
    public Flux<FullDetails> getData(){
        return Flux.create(emitter ->{
            followersRepository.findAll() 
            .doOnNext(data -> {
                List<String> all = data.getTopiclist(); //will get list of topic id
                List<Alltopics> processedList = new ArrayList<Alltopics>();
                all.forEach(action -> {
                    topicRepository.findById(action) //will get full detail about topic
                    .doOnSuccess(topic ->{
                        processedList.add(topic);
                        if (processedList.size() >= all.size()) {
                            FullDetails fulldetails = new FullDetails(action,processedList);
                            emitter.next(fulldetails);
                            //emitter.complete();
                        }
                    })
                    .subscribe();
                });
            })
            .doOnComplete(() ->{
                System.out.println("All the data are processed !!!");
                //emitter.complete(); // executing if all the data are pushed from database not waiting for doOnNext method to complete.
            })
            .subscribe();
        });
    }

For more details, refer the code here CodeLink.

I have tried with doOnComplete and doOnFinally for outer Flux, it is not waiting for all inner Non-blocking calls to complete.

I want to call onComplete, after processing all the nested Mono(non-blocking) request inside Flux.

1
How is this related to RxJava? Edited tags to use project-reactor instead.akarnokd
Just to get idea/suggestions from Reactive programming users, i have added it. i believe, almost operators in Flux is equal to Flowable and Mono is Single. i have no other justifications to add a tag for rxjava here.Prabu Subra
did you find a solution? i need previous results too and want to know when everyting is completewutzebaer

1 Answers

1
votes

Your Flux inside the emitter isn't actually doing anything as there are no subscribers. An emitter generally reacts to an event raised e.g. a Message received etc. You can add a subscribe() below to make it work. Have a read on Hot vs Cold subscribers. http://projectreactor.io/docs/core/snapshot/reference/#reactor.hotCold

return Flux.create(emitter -> {
            Flux.just(1,2,3,4,5) //list of ids from database

                    .doOnNext(uuid ->{
                        this.getData(uuid).doOnSuccess((result) -> {
                            System.out.println("query data from database "+uuid);
                            emitter.next("Data from database.");
                        });
                    })
                    .doOnComplete(()->{
                        System.out.println("Not waiting for all the Nested Mono to complete. ");
                    })
            .subscribe();
        });

If your call is to the DB you might want to instead not worry about raising events via emitter e.g.

public Flux<String> getAllData2(){
        return Flux.just(1, 2, 3, 4, 5)
                .flatMap(uuid1 -> getData(uuid1).doOnSuccess(result -> System.out.println("query data from database " + result)))
                .doOnComplete(() -> System.out.println("Not waiting for all the Nested Mono to complete. "));
    }