Store the body to a temporary file and consume
static <R> Mono<R> writeBodyToTempFileAndApply( final WebClient.ResponseSpec spec, final Function<? super Path, ? extends R> function) { return using( () -> createTempFile(null, null), t -> write(spec.bodyToFlux(DataBuffer.class), t) .thenReturn(function.apply(t)), t -> { try { deleteIfExists(t); } catch (final IOException ioe) { throw new RuntimeException(ioe); } } ); }
Pipe the body and consume
static <R> Mono<R> pipeBodyAndApply( final WebClient.ResponseSpec spec, final ExecutorService executor, final Function<? super ReadableByteChannel, ? extends R> function) { return using( Pipe::open, p -> { final Future<Disposable> future = executor.submit( () -> write(spec.bodyToFlux(DataBuffer.class), p.sink()) .log() .doFinally(s -> { try { p.sink().close(); log.debug("p.sink closed"); } catch (final IOException ioe) { throw new RuntimeException(ioe); } }) .subscribe(DataBufferUtils.releaseConsumer()) ); return just(function.apply(p.source())) .log() .doFinally(s -> { try { final Disposable disposable = future.get(); assert disposable.isDisposed(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }); }, p -> { try { p.source().close(); log.debug("p.source closed"); } catch (final IOException ioe) { throw new RuntimeException(ioe); } } ); }