To achieve this, my initial approach is to make sure Flux is completed using then and after that manipulate data.
This is the wrong way of thinking for reactive programming - you need to modify the data as it's flowing through the flux. The then()
methods will ignore the results from the flux entirely, and just output some other, unrelated Mono
when complete.
In the case where you want to take a Flux
of some element, and want to reduce that down into a Mono
of some other element, you most likely want the reduce()
method. That will take an initial Obj2
in your case, and then a BiFunction
whose purpose is to take an intermediate Obj2
, an Obj1
in the Flux
, and then produce an updated Obj2
. The reduce()
operator will then apply this reduction on the entire stream, giving you a Mono<Obj2>
at the end.
It's not immediately obvious from your code what you want to achieve specifically, but the following is a related example (lombok used for brevity):
@Data
@AllArgsConstructor
class Obj1 {
private String a;
private String combine;
private String combine2;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
class Obj2 {
private String a;
private Map<String, Integer> combine = new HashMap<>();
private Map<String, Integer> combine2 = new HashMap<>();
}
public class NewClass {
public static void main(String[] args) {
Flux<Obj1> flux = Flux.just(
new Obj1("123", "456", "789"),
new Obj1("123", "456", "789"),
new Obj1("123", "455", "789"));
Mono<Obj2> mono = flux.reduce(new Obj2(), (o2, o1) -> {
Map<String, Integer> combine = new HashMap<>(o2.getCombine());
combine.put(o1.getCombine(), combine.getOrDefault(o1.getCombine(), 0) + 1);
Map<String, Integer> combine2 = new HashMap<>(o2.getCombine2());
combine2.put(o1.getCombine2(), combine2.getOrDefault(o1.getCombine2(), 0) + 1);
return new Obj2(o1.getA(), combine, combine2);
});
mono.subscribe(System.out::println);
}
}