1
votes

I have some initial state in my application and a few of policies that decorates this state with reactively fetched data (each of policy's Mono returns new instance of state with additional data). Eventually I get fully decorated state.

It basically looks like this:

public interface Policy {
    Mono<State> apply(State currentState);
}

Usage for fixed number of policies would look like that:

Flux.just(baseState)
    .flatMap(firstPolicy::apply)
    .flatMap(secondPolicy::apply)
    ...
    .subscribe();

It basically means that entry state for a Mono is result of accumulation of initial state and each of that Mono predecessors.

For my case policies number is not fixed and it comes from another layer of the application as a collection of objects that implements Policy interface.

Is there any way to achieve similar result as in the given code (with 2 flatMap), but for unknown number of policies? I have tried with Flux's reduce method, but it works only if policy returns value, not a Mono.

2

2 Answers

2
votes

This seems difficult because you're streaming your baseState, then trying to do an arbitrary number of flatMap() calls on that. There's nothing inherently wrong with using a loop to achieve this, but I like to avoid that unless absolutely necessary, as it breaks the natural reactive flow of the code.

If you instead iterate and reduce the policies into a single policy, then the flatMap() call becomes trivial:

Flux.fromIterable(policies)
        .reduce((p1,p2) -> s -> p1.apply(s).flatMap(p2::apply))
        .flatMap(p -> p.apply(baseState))
        .subscribe();

If you're able to edit your Policy interface, I'd strongly suggest adding a static combine() method to reference in your reduce() call to make that more readable:

interface Policy {
    Mono<State> apply(State currentState);

    public static Policy combine(Policy p1, Policy p2) {
        return s -> p1.apply(s).flatMap(p2::apply);
    }
}

The Flux then becomes much more descriptive and less verbose:

Flux.fromIterable(policies)
        .reduce(Policy::combine)
        .flatMap(p -> p.apply(baseState))
        .subscribe();

As a complete demonstration, swapping out your State for a String to keep it shorter:

interface Policy {
    Mono<String> apply(String currentState);

    public static Policy combine(Policy p1, Policy p2) {
        return s -> p1.apply(s).flatMap(p2::apply);
    }
}

public static void main(String[] args) {
    List<Policy> policies = new ArrayList<>();
    policies.add(x -> Mono.just("blah " + x));
    policies.add(x -> Mono.just("foo " + x));

    String baseState = "bar";
    Flux.fromIterable(policies)
            .reduce(Policy::combine)
            .flatMap(p -> p.apply(baseState))
            .subscribe(System.out::println); //Prints "foo blah bar"

}
1
votes

If I understand the problem correctly, then the most simple solution is to use a regular for loop:

Flux<State> flux = Flux.just(baseState);

for (Policy policy : policies)
{
    flux = flux.flatMap(policy::apply);
}

flux.subscribe();

Also, note that if you have just a single baseSate you can use Mono instead of Flux.

UPDATE:

If you are concerned about breaking the flow, you can extract the for loop into a method and apply it via transform operator:

Flux.just(baseState)
    .transform(this::applyPolicies)
    .subscribe();

private Publisher<State> applyPolicies(Flux<State> originalFlux)
{
    Flux<State> newFlux = originalFlux;

    for (Policy policy : policies)
    {
        newFlux = newFlux.flatMap(policy::apply);
    }

    return newFlux;
}