0
votes

We are working with spring boot 2.0.0.BUILD_SNAPSHOT and spring boot webflux 5.0.0 and currently we cant transfer a flux to a client on request.

Currently I am creating the flux from an iterator:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }
    });
}

And on request I am simply doing:

@RequestMapping(value="/all", method=RequestMethod.GET, produces="application/json")
public Flux<ItemIgnite> getAllFlux() {
    return this.provider.getAllFlux();
}

When I now locally call localhost:8080/all after 10 seconds I get a 503 status code. Also as at client when I request /all using the WebClient:

public Flux<ItemIgnite> getAllPoducts(){
    WebClient webClient = WebClient.create("http://localhost:8080");

    Flux<ItemIgnite> f = webClient.get().uri("/all").accept(MediaType.ALL).exchange().flatMapMany(cr -> cr.bodyToFlux(ItemIgnite.class));
    f.subscribe(System.out::println);
    return f;

}

Nothing happens. No data is transferred.

When I do the following instead:

public Flux<List<ItemIgnite>> getAllFluxMono() {
    return Flux.just(this.getAllList());
}

and

@RequestMapping(value="/allMono", method=RequestMethod.GET, produces="application/json")
public Flux<List<ItemIgnite>> getAllFluxMono() {
    return this.provider.getAllFluxMono();
}

It is working. I guess its because all data is already finished loading and just transferred to the client as it usually would transfer data without using a flux.

What do I have to change to get the flux streaming the data to the web client which requests those data?

EDIT

I have data inside an ignite cache. So my getAllIterator is loading the data from the ignite cache:

public Iterator<Cache.Entry<String, ItemIgnite>> getAllIterator() {
    return this.igniteCache.iterator();
}

EDIT

adding flux.complete() like @Simon Baslé suggested:

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete(); // see here
    });
}

Solves the 503 problem in the browser. But it does not solve the problem with the WebClient. There is still no data transferred.

EDIT 3

using publishOn with Schedulers.parallel():

public Flux<ItemIgnite> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.<ItemIgnite>create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue());
        }

        flux.complete();
    }).publishOn(Schedulers.parallel());
}

Does not change the result.

Here I post you what the WebClient receives:

value :[Item ID: null, Product Name: null, Product Group: null]
complete

So it seems like he is getting One item (out of over 35.000) and the values are null and he is finishing after.

2
Could you explain what getAllIterator is doing? Is it blocking? Reading data from a database? From memory?Brian Clozel

2 Answers

4
votes

One thing that jumps out is that you never call flux.complete() in your create.

But there's actually a factory operator that is tailored to transform an Iterable to a Flux, so you could just do Flux.fromIterable(this)

Edit: in case your Iterator is hiding complexity like a DB request (or any blocking I/O), be advised this spells trouble: anything blocking in a reactive chain, if not isolated on a dedicated execution context using publishOn, has the potential to block not only the entire chain but other reactive processes has well (as threads can and will be used by multiple reactive processes).

Neither create nor fromIterable do anything in particular to protect from blocking sources. I think you are facing that kind of issue, judging from the hang you get with the WebClient.

0
votes

The problem was my Object ItemIgnite which I transfer. The system Flux seems not to be able to handle this. Because If I change my original code to the following:

public Flux<String> getAllFlux() {
    Iterator<Cache.Entry<String, ItemIgnite>> iterator = this.getAllIterator();

    return Flux.create(flux -> {
        while(iterator.hasNext()) {
            flux.next(iterator.next().getValue().toString());
        }
    });
}

Everything is working fine. Without publishOn and without flux.complete(). Maybe someone has an idea why this is working.