I have a service which returns Flux<List<Integer>>
and I would like to convert it into Mono<List<Integer>>
to be used inside transform()
Here is what I did by using flatMap
and Mono.just()
:
private Mono<List<Integer>> filterAndMap(Mono<List<Integer>> listMono) {
return listMono.filter(integers -> integers.size() == 3)
.map(integers -> integers.stream()
.map(integer -> integer * 10)
.collect(Collectors.toList()));
}
void method1() {
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
.flatMap(integers -> this.filterAndMap(Mono.just(integers)))
.doOnNext(System.out::println)
.blockLast();
}
But what I would like is to use transform
instead:
void method2() {
Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
.convertFluxToMono()?
.transform(this::filterAndMap)
.doOnNext(System.out::println)
.block();
}
Is there any operator or technique to make method2 to work?
As a result I expect onNext()
be called 2 times each with value of List
I know filterAndMap method could be simplified and be used without any complication but actual method is huge(just modified for clarity) and there are many operators in a chain that I want to reuse by avoiding duplications.
Update:
A little bit context what I want to achieve.
I have 2 services - one via Http which returns me Mono<List<Integer>>
and another one via Redis which returns Flux<List<Integer>>
.
For both cases I have the exact same functionality - chain of 10-15 operators and what I want to achieve is to avoid duplicate code.
For example:
void f1() {
Mono<List<Integer>> mono = getFromHttp();
mono
.map(integers -> integers.stream().collect(Collectors.groupingBy(Function.identity())))
.filter(entry -> entry.size() > 5)
//...
//many other operators
//...
//.flatMap()
//.switchIfEmpty()
//.doOnNext()
.retryWhen();
}
void f2() {
Flux<List<Integer>> flux = getFromRedis();
flux.
//...
//same functionality here
//...
}
Maybe better to not concentrate to convert Flux to Mono but instead convert Mono to Flux which I guess much easier?
Update2:
I have changed my existing filterAndMap into Flux. And instead of converting Flux to Mono I went with Mono to Flux
now I can call Flux.transform(this::filterAndMap)
and Mono.transform(mono -> this.filterAndMap(mono.flux()))
accordingly
private Flux<List<Integer>> filterAndMap(Flux<List<Integer>> listMono) {
return listMono.filter(integers -> integers.size() == 3)
.map(integers -> integers.stream()
.map(integer -> integer * 10)
.collect(Collectors.toList()));
}
Thanks @Michael Berry, even I have changed the implementation, your solution fully covers my initial question/issue. So I accept it.
And thanks to @Simon Baslé for a good call, I have redesign my transformer and went with safer approach(Mono to Flux)
transform()
is available on a flux as well, why are you trying to convert it to a mono? (the first method isn't really converting it to a mono either, you're just wrapping up each element of the flux into a mono before usingflatMap
, when you'd be better just usingmap
.) Appreciate this is a cut down example, but I think we need some more context to work out what your aim is. – Michael Berrytransform
is not a requirement I just didn't find other operator which will help to reuse some peace of code written in reactive. What I want is to use same functionality both for Mono and Flux, because the data I receive exactly the same and functionality I'm going to use is also the same. – SergMono
andFlux
without code duplication? – Michael BerryonNext
.next()
is a viable approach only if it is ok to drop extraneous elements in that case – Simon Baslé