1
votes

I have a service which returns Flux<List<Integer>> and I would like to convert it into Mono<List<Integer>> to be used inside transform()

Here is what I did by using flatMap and Mono.just():

private Mono<List<Integer>> filterAndMap(Mono<List<Integer>> listMono) {
  return listMono.filter(integers -> integers.size() == 3)
      .map(integers -> integers.stream()
          .map(integer -> integer * 10)
          .collect(Collectors.toList()));
}

void method1() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .flatMap(integers -> this.filterAndMap(Mono.just(integers)))
      .doOnNext(System.out::println)
      .blockLast();
}

But what I would like is to use transform instead:

void method2() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .convertFluxToMono()?
      .transform(this::filterAndMap)
      .doOnNext(System.out::println)
      .block();
}

Is there any operator or technique to make method2 to work? As a result I expect onNext() be called 2 times each with value of List

I know filterAndMap method could be simplified and be used without any complication but actual method is huge(just modified for clarity) and there are many operators in a chain that I want to reuse by avoiding duplications.

Update:

A little bit context what I want to achieve. I have 2 services - one via Http which returns me Mono<List<Integer>> and another one via Redis which returns Flux<List<Integer>>. For both cases I have the exact same functionality - chain of 10-15 operators and what I want to achieve is to avoid duplicate code.

For example:

  void f1() {
    Mono<List<Integer>> mono = getFromHttp();
    
    mono
        .map(integers -> integers.stream().collect(Collectors.groupingBy(Function.identity())))
        .filter(entry -> entry.size() > 5)
        //...
        //many other operators
        //...
        //.flatMap()
        //.switchIfEmpty()
        //.doOnNext()
        .retryWhen();
  }

  void f2() {
    Flux<List<Integer>> flux = getFromRedis();
    
    flux.
        //...
        //same functionality here
        //...
  }

Maybe better to not concentrate to convert Flux to Mono but instead convert Mono to Flux which I guess much easier?

Update2:

I have changed my existing filterAndMap into Flux. And instead of converting Flux to Mono I went with Mono to Flux

now I can call Flux.transform(this::filterAndMap) and Mono.transform(mono -> this.filterAndMap(mono.flux())) accordingly

private Flux<List<Integer>> filterAndMap(Flux<List<Integer>> listMono) {
  return listMono.filter(integers -> integers.size() == 3)
      .map(integers -> integers.stream()
          .map(integer -> integer * 10)
          .collect(Collectors.toList()));
}

Thanks @Michael Berry, even I have changed the implementation, your solution fully covers my initial question/issue. So I accept it.

And thanks to @Simon Baslé for a good call, I have redesign my transformer and went with safer approach(Mono to Flux)

1
Not sure what you're trying to achieve here - transform() is available on a flux as well, why are you trying to convert it to a mono? (the first method isn't really converting it to a mono either, you're just wrapping up each element of the flux into a mono before using flatMap, when you'd be better just using map.) Appreciate this is a cut down example, but I think we need some more context to work out what your aim is.Michael Berry
Yes, I did it with purpose to show the way I have implemented Mono.just(), is not what I want. I have updated the post. transform is not a requirement I just didn't find other operator which will help to reuse some peace of code written in reactive. What I want is to use same functionality both for Mono and Flux, because the data I receive exactly the same and functionality I'm going to use is also the same.Serg
Ah, gotcha. So you want the same transformation applied to Mono and Flux without code duplication?Michael Berry
you have to be extra careful with this idea. Applying an arbitrary Flux transform function to a Mono is indeed possible, but if you want the output to be a Mono, then you need to think about what happens if the Flux result of the transformation DOES produce more than one onNext. next() is a viable approach only if it is ok to drop extraneous elements in that caseSimon Baslé
thanks @Simon Baslé, I guess I met your mentioned issue and seems what I wanted is kind of non sense and probably better convert Mono to Flux instead of vice versa. And in that case it will be safe?Serg

1 Answers

0
votes

As per the comments, taking the question here as "How do I apply the same transformation to both a Mono and a Flux without duplicating the code?"

You could use a simple utility function like so:

static <T> Function<Mono<T>, Mono<T>> toMonoTransformer(
    Function<Flux<T>, Flux<T>> transformer) {
  return listMono -> transformer.apply(listMono.flux()).next();
}

This function can then be used on any transformer of any type. You can then use the transformation function as-is for a Flux:

void method2() {
  Flux.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5, 6), Arrays.asList(7, 8, 9, 10))
      .transform(this::filterAndMap)
      .doOnNext(System.out::println)
      .blockLast();
}

...and applying a quick utility method for the Mono:

void method1() {
  Mono.just(Arrays.asList(1, 2, 3))
      .transform(toMonoTransformer(this::filterAndMap))
      .doOnNext(System.out::println)
      .block();
}