
I'm using WebClient and custom BodyExtractorclass for my spring-boot application

WebClient webLCient = WebClient.create();
   .uri(url, params)
   .flatMap(response -> {
     return response.body(new BodyExtractor());


public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;

Above code works with small payload but not on a large payload, I think it's because I'm only reading a single flux value with next and I'm not sure how to combine and read all dataBuffer.

I'm new to reactor, so I don't know a lot of tricks with flux/mono.


6 Answers


This is really not as complicated as other answers imply.

The only way to stream the data without buffering it all in memory is to use a pipe, as @jin-kwon suggested. However, it can be done very simply by using Spring's BodyExtractors and DataBufferUtils utility classes.


private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
        .doFinally(s -> {
            try(osPipe) {
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);

    DataBufferUtils.write(body, osPipe)

    return isPipe;

If you don't care about checking the response code or throwing an exception for a failure status code, you can skip the block() call and intermediate ClientResponse variable by using

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))



A slightly modified version of Bk Santiago's answer makes use of reduce() instead of collect(). Very similar, but doesn't require an extra class:


body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

Or Kotlin:

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

Benefit of this approach over using collect() is simply you don't need to have a different class to gather things up.

I created a new empty InputStream(), but if that syntax is confusing, you can also replace it with ByteArrayInputStream("".toByteArray()) instead to create an empty ByteArrayInputStream as your initial value instead.


Here comes another variant from other answers. And it's still not memory-friendly.

static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
    return response.bodyToFlux(DataBuffer.class)
        .map(b -> b.asInputStream(true))

static void doSome(WebClient.ResponseSpec response) {
        .doOnNext(stream -> {
            // do some with stream
            // close the stream!!!

I was able to make it work by using Flux#collect and SequenceInputStream

public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;


public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);

  public InputStream getInputStream() {
    return this.is;

There's a much cleaner way to do this using the underlying reactor-netty HttpClient directly, instead of using WebClient. The composition hierarchy is like this:

WebClient -uses-> HttpClient -uses-> TcpClient

Easier to show code than explain:

    .responseContent() // ByteBufFlux
    .aggregate() // ByteBufMono
    .asInputStream() // Mono<InputStream>
    .block() // We got an InputStream, yay!

However, as I've pointed out already, using InputStream is a blocking operation, that defeats the purpose of using a non-blocking HTTP client, not to mention aggregating the whole response. See this for a Java NIO vs. IO comparison.


You can use pipes.

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source, final Executor executor,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> {
                     executor.execute(() -> write(source, p.sink())
                             .doFinally(s -> {
                                 try {
                                 } catch (final IOException ioe) {
                                     log.error("failed to close pipe.sink", ioe);
                                     throw new RuntimeException(ioe);
                     return just(function.apply(p.source()));
                 p -> {
                     try {
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);

Or using CompletableFuture,

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
                         .doFirst(() -> write(source, p.sink())
                                 .doFinally(s -> {
                                     try {
                                     } catch (final IOException ioe) {
                                         log.error("failed to close pipe.sink", ioe);
                                         throw new RuntimeException(ioe);
                 p -> {
                     try {
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);