0
votes

I have following scenario for a data loader in a Webflux application using the reactive MongoDB driver and Spring:

  1. create X objects of type B
  2. create Y objects of type A: object A contains a field of type array and a reference to an object of type B. The reference to B is chosen randomly from the first step
  3. add N entries to the array of the previously created object

The problem I am facing seems to be parallel execution of the Mono/Flux, which should not happen from my understanding. According to the documentation things are always executed in sequence unless specified otherwise.

Can someone please give me a hint what I am doing wrong?

Here is an example code snippet. Object A is a toilet. Object B is a user. The array field is the comments field:

Flux.range(0, 10)
            // create 10 objects of type user
            .flatMap {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames.random(),
                        email = "${userNames.random()}@mail.com"
                    )
                )
            }
            .collectList()
            // create 2 objects of type toilet
            .flatMapMany { userList ->
                Flux.range(0, 2).zipWith(Flux.range(0, 2).map { userList })
            }
            .flatMap {
                LOG.debug("Creating toilet ${it.t1}")
                val userList = it.t2

                toiletRepository.save(
                    Toilet(
                        id = ObjectId(),
                        title = userList.random().name
                    )
                )
            }
            // add 5 entries to array of toilet
            .flatMap { toilet ->
                Flux.range(0, 5).zipWith(Flux.range(0, 5).map { toilet })
            }
            .flatMap { tuple ->
                val toilet = tuple.t2
                LOG.debug("Creating comment ${tuple.t1} for toilet $toilet")

                // get current values from toilet
                toiletRepository.findById(toilet.id).map {
                    // and push a new element to the comments array
                    LOG.debug("Comment size ${it.commentRefs.size}")
                    toiletRepository.save(it.apply { commentRefs.add(ObjectId()) })
                }
            }
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }

Executing this code produces following log:

2020-11-15 19:42:54.197 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 0
2020-11-15 19:42:54.293 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 1
2020-11-15 19:42:54.295 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 2
2020-11-15 19:42:54.296 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 3
2020-11-15 19:42:54.300 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 4
2020-11-15 19:42:54.301 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 5
2020-11-15 19:42:54.304 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 6
2020-11-15 19:42:54.310 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 7
2020-11-15 19:42:54.316 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 8
2020-11-15 19:42:54.318 DEBUG 13524 --- [           main] c.g.q.t.DataLoaderRunner                 : Creating user 9
2020-11-15 19:42:54.348 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 0
2020-11-15 19:42:54.380 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Creating toilet 1
2020-11-15 19:42:54.386 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.405 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.406 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.407 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.409 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051c, title=wholesale, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.410 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 0 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.412 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 1 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.413 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 2 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.414 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 3 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.415 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Creating comment 4 for toilet Toilet(id=5fb176aef24f4c248fbb051d, title=imaginary, location=Point [x=0.000000, y=0.000000], previewID=null, averageRating=0.0, ratingRefs=[], disabled=false, toiletCrewApproved=false, description=, commentRefs=[], imageRefs=[])
2020-11-15 19:42:54.425 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-8] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-6] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-3] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.425 DEBUG 13524 --- [ntLoopGroup-3-7] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-2] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.429 DEBUG 13524 --- [ntLoopGroup-3-9] c.g.q.t.DataLoaderRunner                 : Comment size 0
2020-11-15 19:42:54.464 DEBUG 13524 --- [tLoopGroup-3-10] c.g.q.t.DataLoaderRunner                 : Comment size 0

I have now three questions:

  1. Why is the Thread switching from main to LoopGroup? If it gets executed in a sequence it should not use multi-threading at all?
  2. Why are the Comment size log messages grouped together at the end?
  3. How to correctly push elements to the array using the reactive mongo repository implementation?

Any hints are appreciated. I assume that the nested execution of findById and save is not correct but how would you write that differently? Since save requires an entity I need to pass in the latest version of the entity which contains one additional element in the array. I try to achive that by getting the latest version with findById and directly modifying it with 'map -> save'.

Thank you all!

1

1 Answers

0
votes

I am not sure if this is the best way to do it but I was able to achieve what I want by splitting up the operations in functions to have it grouped more logically.

Here are the snippets for following operations:

  1. create users
  2. create comments
  3. create ratings
private fun createUsers() = Flux.range(0, userNames.size + 1)
        .flatMap {
            if (it < userNames.size) {
                LOG.debug("Creating user $it")
                userRepository.save(
                    User(
                        id = ObjectId(),
                        name = userNames[it],
                        email = "${userNames[it]}@mail.com"
                    )
                )
            } else {
                LOG.debug("Creating dev-user")
                userRepository.save(
                    User(
                        id = ObjectId("000000000000012343456789"),
                        name = "devuser",
                        email = "[email protected]"
                    )
                )
            }
        }
        .collectList()
private fun createComments(users: List<User>) = Flux.range(0, numComments)
        .flatMap {
            LOG.debug("Creating comment $it")
            commentRepository.save(
                Comment(
                    id = ObjectId(),
                    text = commentTexts.random(),
                    userRef = users.random().id
                )
            )
        }
        .collectList()
private fun createRatings(users: List<User>) = Flux.range(0, numRatings)
        .flatMap {
            LOG.debug("Creating rating $it")
            ratingRepository.save(
                Rating(
                    id = ObjectId(),
                    userRef = users.random().id,
                    value = Random.nextInt(0, 5)
                )
            )
        }
        .collectList()

And finally creating the toilets with the result from above:

private fun createToilets(comments: List<Comment>, ratings: List<Rating>) = Flux.range(0, numToilets)
        .flatMap {
            val toilet = Toilet(
                id = ObjectId(),
                title = titles.random(),
                location = GeoJsonPoint(Random.nextDouble(10.0, 20.0), Random.nextDouble(45.0, 55.0)),
                description = descriptions.random()
            )

            // add comments
            val commentsToAdd = Random.nextInt(0, comments.size)
            for (i in 0 until commentsToAdd) {
                toilet.commentRefs.add(comments[i].id)
            }

            // add average rating and rating references
            val ratingsToAdd = Random.nextInt(0, ratings.size)
            for (i in 0 until ratingsToAdd) {
                toilet.ratingRefs.add(ratings[i].id)
                toilet.averageRating += ratings[i].value
            }
            if (toilet.ratingRefs.isNotEmpty()) {
                toilet.averageRating /= toilet.ratingRefs.size
            }

            LOG.debug("Creating toilet $it with $commentsToAdd comments and $ratingsToAdd ratings")

            toiletRepository.save(toilet)
        }
        // upload preview image
        .flatMap { toilet ->
            val imageName = "toilet${Random.nextInt(1, 10)}.jpg"
            imageService.store(
                Callable {
                    DataLoaderRunner::class.java.getResourceAsStream("/sample-images/$imageName")
                },
                "${toilet.title}-preview"
            ).zipWith(Mono.just(toilet))
        }
        // set preview image
        .flatMap {
            val imageId = it.t1
            val toilet = it.t2

            toiletRepository.save(toilet.copy(previewID = imageId))
        }
        .collectList()

This is the final reactive operation chain:

createUsers()
            .flatMap { users ->
                createComments(users).map { comments ->
                    Tuples.of(users, comments)
                }
            }
            .flatMap {
                val users = it.t1
                val comments = it.t2

                createRatings(users).map { ratings ->
                    Tuples.of(comments, ratings)
                }
            }
            .flatMap {
                val comments = it.t1
                val ratings = it.t2

                createToilets(comments, ratings)
            }
            // close application when all toilets are processed
            .subscribe {
                GlobalScope.launch {
                    exitProcess(SpringApplication.exit(context))
                }
            }

I am not sure if this is the best way to do it but it is working. The approach in the opening post is using nested map/flatmap operations which should be anyway avoided and maybe they are the reason why it was not working.