0
votes

I have a Flux of strings that should be converted to a Flux of dto. Parsing can be finished with an error and by the business rules I just need to skip such entries

If I use "Kotlin's" null - I got NPE because by design reactor doesn't accept nulls in .map

fun toDtoFlux(source:Flux<String>):Flux<Dto>{
    source.map(Parser::parse)
          .filter(it!=null)
}

object Parser{
   fun parse(line:String):Dto?{
   ..
 }
}

I can use Optional. But it is not a Kotlin way.

fun toDtoFlux(source:Flux<String>):Flux<Dto>{
    source.map(Parser::parse)
          .filter(Optional.isPresent)
          .map(Optional::get)
}

object Parser{
   fun parse(line:String):Optional<Dto>{
   ..
 }
}

What is the most idiomatic way to handle such cases in Kotlin?

2

2 Answers

1
votes

The solutions I see :

Using Reactor API

I'd suggest you to use Reactor API to address such case, and make your parser return a Mono. The empty Mono represents the absence of result. With that, you can use flatMap instead of chaining map/filter/map.

It may seem a little overkill like that, but it will allow any parser implementation to do async stuff in the future if needed (fetching information from third-party service, waiting validation from user, etc.).

And it also provide a powerful API to manage parsing errors, as you can define backoff/custom error policies on parsing result.

That would change your example like that :

fun interface Parser {
   fun parse(record: String): Mono<Dto>;
}

fun Parser.toDtoFlux(source:Flux<String>): Flux<Dto> {
    source.flatMap(this::parse)
}

Using sealed class

Kotlin offers other ways of managing result options, inspired by functional programming. One way is to use sealed classes to desing a set of common cases to handle upon parsing. It allows to model rich results, giving parser users multiple choices to handle errors.

sealed class ParseResult

class Success(val value: Dto) : ParseResult
class Failure(val reason : Exception) : ParseResult
object EmptyRecord : ParseResult

fun interface Parser {
    fun parse(raw: String) : ParseResult
}

fun Parser.toDtoFlux(source:Flux<String>): Flux<Dto> {
    return source.map(this::parse)
                 .flatMap { when (it) { 
                    is Success -> Mono.just(it.value)
                    is Failure -> Mono.error(it.reason) // Or Mono.empty if you don't care
                    is EmptyRecord -> Mono.empty()
                 }}
}
1
votes

You can create an extension function:

fun <T, U> Flux<T>.mapNotNull(mapper: (T) -> U?): Flux<U> =
    this.flatMap { Mono.justOrEmpty(mapper(it)) }

Then you can use it like this:

fun main() {
    Flux.just("a", "b", "c")
        .mapNotNull { someNullableMapFunction(it) }
        .doOnNext { println(it) } // prints "a" and "c"
        .blockLast()
}

fun someNullableMapFunction(it: String): String? {
    if (it == "b") {
        return null
    }

    return it
}

UPDATE

Based on Simon's comment extension function implementation might be more idiomatic (and performant?) in Reactor this way:

fun <T, U> Flux<T>.mapNotNull(mapper: (T) -> U?): Flux<U> =
    this.handle { item, sink -> mapper(item)?.let { sink.next(it) } }