0
votes

I found examples on the Internet but this doesn't give me a full understanding. Standard CRUD when using WebFlux.

Router:

@Configuration
public class PersonRouter {

    @Bean
    public RouterFunction<ServerResponse> route(PersonHandler handler) {
        return RouterFunctions
                .route(GET("/getAllPersons").and(accept(MediaType.APPLICATION_JSON)), handler::findAll)
                .andRoute(GET("/getPerson/{id}").and(accept(MediaType.APPLICATION_STREAM_JSON)), handler::findById)
                .andRoute(POST("/createPerson").and(accept(MediaType.APPLICATION_JSON)), handler::save)
                .andRoute(DELETE("/deletePerson/{id}").and(accept(MediaType.APPLICATION_JSON)), handler::delete);
    }

}

Handler:

@Component
public class PersonHandler {

    private final PersonService personService;

    public PersonHandler(PersonService personService) {
        this.personService = personService;
    }

    public Mono<ServerResponse> findById(ServerRequest request) {
        String id = request.pathVariable("id");
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(personService.getById(id), Person.class);
    }

    public Mono<ServerResponse> findAll(ServerRequest request) {
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(personService.getAll(), Person.class);
    }

    public Mono<ServerResponse> save(ServerRequest request) {
        final Mono<Person> person = request.bodyToMono(Person.class);
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(fromPublisher(person.flatMap(personService::save), Person.class));
    }

    public Mono<ServerResponse> delete(ServerRequest request) {
        String id = request.pathVariable("id");
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(personService.delete(id), Void.class);
    }

}

Repository:

@Repository
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
}

Service:

@Service
@Transactional
@AllArgsConstructor
public class PersonService {

    private final PersonRepository personRepository;

    public Flux<Person> getAll() {
        return personRepository.findAll().switchIfEmpty(Flux.empty());
    }

    public Mono<Person> getById(final String id) {
        return personRepository.findById(id);
    }

    public Mono update(final String id, final Person person) {
        return personRepository.save(person);
    }

    public Mono save(final Person person) {
        return personRepository.save(person);
    }

    public Mono delete(final String id) {
        final Mono<Person> dbPerson = getById(id);
        if (Objects.isNull(dbPerson)) {
            return Mono.empty();
        }
        return getById(id).switchIfEmpty(Mono.empty()).filter(Objects::nonNull).flatMap(personToBeDeleted -> personRepository
                .delete(personToBeDeleted).then(Mono.just(personToBeDeleted)));
    }
}

I understand everything except the save and update methods. I don't understand why we use flatMap in this situation. Why is this so, and how can I write the implementation of the update method in my Handler.

Updated

Let's see the method save() in Handler

public Mono<ServerResponse> save(ServerRequest request) {
        final Mono<Person> person = request.bodyToMono(Person.class);
        return ok()
                .contentType(MediaType.APPLICATION_JSON)
                .body(fromPublisher(person.flatMap(personService::save), Person.class));
    }

I think the fact is that we have already received:

final Mono<Person> person = request.bodyToMono(Person.class);

and then we do:

personService::save

As a result, we get Mono< Mono< Person>>

flatMap is just like map, except that it unpacks the return value of the lambda given if the value is itself contained in a Publisher<T>. In our case, the personService.save(T) method returns a Mono<T>. If we’d used map instead of flatMap(T), we’d have a Mono< Mono< T>>, when what we really want is a Mono<T>. We can cleanly solve this problem using flatMap.

Am I right or is this statement wrong?

1
your update comment is exactly right, you can add that as a self-answer ;)Simon Baslé
Thank you! And help me please to understand why I need to use .body(fromPublisher) ? ThanksKirill Sereda
WebFlux is entirely non-blocking. It sends the headers first, then is capable of sending a body that is not yet entirely "computed", by sending chunks to the remote client as the corresponding data is generated and emitted by the Publisher you pass to body methodSimon Baslé
Thanks for your answer! Maybe you can give me link to read about this part ? I mean .body(fromPublisher). ThanksKirill Sereda

1 Answers

1
votes

why you need flatMap.

These are my ideas, the answer varies depending on whether you it works on Mono or Flux.

1.

javadoc of method map and flatMap shows their usage:

map: Transform the item emitted by this {@link Mono} by applying a synchronous function to it.

flatMap: Transform the item emitted by this {@link Mono} asynchronously, returning the value emitted by another {@link Mono} (possibly changing the value type).

That says, considering flatMap and map as a pipeline with input and output, you use map when the output is the same item , otherwise, use flatMap. Check this:

public Mono<ServerResponse> influCRUD(ServerRequest req) {
    return req.bodyToMono(S.class) // the pipline begins with S class.
       .map(s -> {s.setF1(f1); s.setF2(f2); return s;}) // the pipeline has the same intput and output, i.e. object s, you use map.
       .flatMap(s -> webClient // the pipeline has S input, and T output, you use flatMap
            .post()
            .uri(uri)
            .body(BodyInserters.fromObject(s))
            .retrive()
            .bodyToMono(T.class) 
        ).flatMap(t -> ServerResponse // now the pipeline changes again, you use flatMap.
           .ok()
           .contentType()
           .body(BodyInserters.fromObject(t))
        );
}

it deserves to mention that map can have different object as output as well.

  1. flatMap processes every items

the above reason is usefull for Mono producer. For Flux, flatMap processes every items, while map processes all items (or one item). this is the same as they are in lambda. In case you want to process every item, you use flatMap.

  1. flatMap takes off one layer of Mono for you.

Look at their declaration:

<R> Mono<R> map(Function<? super T, ? extends R> mapper)

and

<R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)

Function does nothing but a -> b, when b is the output of another Producer/Subsciber (this is very likely when you use reactive programming) like the webClient part in the former example, it is in the form of Mono or Flux. By using flatMap, it returns Mono<R> for you, where map returns Mono<Mono<R>>, as they are stated in the function declaration.

I am a beginner in reative programming too, more than welcome to correct this.