1
votes

I have a reactive core WebClient to post to a given endpoint. The payload is a flux of Job objects and the content-type is application/stream+json Flux jobFlux = Flux.just(new Job());

Mono<JsonNode> response = localEP.post().uri( "/dev/job" )
    .contentType(MediaType.APPLICATION_STREAM_JSON)
    .body( BodyInserters.fromObject(jobFlux))
    .retrieve()
    .bodyToMono( JsonNode.class );

On the server end I have tried both a Spring Controller style and Spring Web Reactive FunctionHandler to process the payload of the above call with a payload that is a Flux.

  @PostMapping(path = "/dev/job", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
  @ResponseStatus( HttpStatus.CREATED )
 public Mono<Void> loadJobs (@RequestBody Flux<Job> jobs) {
    return this.repository.create(jobs); 
 }

The domain class Job creates and id when a new object is instantiated:

  public Job() {
    UUID guid = UUID.randomUUID();
    id = guid.toString();
    title = "Random String";
  }

The repository currently is just a stub:

@Repository
public class DemoJobRepository implements ReactiveRepository<Job> {
   private static Logger logger = LoggerFactory.getLogger(DemoJobRepository.class);
   private final List<Job> jobs = Lists.newArrayList();

@Override
public Mono<Void> create(Publisher<Job> jobStream) {
    return Flux.from(jobStream).doOnNext(jobs::add).then();
}

@Override
public Flux<Job> getAll() {
    return Flux.fromIterable(jobs);
}

@Override
public Mono<Job> findById(String id) {
    return null;
}
}

I don't see the client attempting to send the requestbody. We I called block on the client to get the result, I see the client send the request, however the server endpoint always sees an empty flux. Please, any help is very appreciate.

1
From what I understand I should not "break the reactive pipeline and decouple the reading of the request and the writing of the response; chances are the HTTP exchange could be closed before your controller has a chance to read the whole request. Calling subscribe does just that." Where are you saying to call subscribe in the repository?gdash27
Also I'm using the content type of application/stream+json because the intent is to stream data over a connection as oppose to making many individual requestgdash27
I'm not sure you understand my intention. The second question of my question is: The payload is a flux of Job objects and the content-type is application/stream+json Thus I don't intend to create multiple request, but rather stream the payload until it is complete.gdash27
You send one object to the server. Streaming makes absolutely no difference. Streaming JSON data means sending objects individually instead of all at once, which can reduce the memory consumption. There is no difference concerning requests.a better oliver

1 Answers

1
votes

With reactive types, nothing happens until you subscribe - building the reactive pipeline will not execute what's it's supposed to do, only subscribing to it will start the process.

There are several ways to subscribe:

  • calling any of the subscribe variants. Many of them take lambdas as parameters that are executed when the processing is done, or ended with an error. Using the empty variant is a bit risky because it just launches the execution but you get no callback whatsoever. Technically, nothing is waiting on it so the JVM could exit before the processing is done

  • calling any of the block/collect methods. This not only subscribes but also returns the expected value(s).

    Both of those choices should never be done within a method that returns a reactive type, otherwise this will lead to serious problems in your application.