Implementing Streaming Responses in Spring AI with WebFlux

Modern AI applications demand responsive, real-time interactions — and streaming responses are central to achieving that. Unlike traditional blocking calls that wait for full output before returning, streaming delivers content incrementally, improving perceived latency and user engagement. In Spring AI, this capability is built atop Spring WebFlux’s reactive stack, enabling non-blocking, event-driven communication with LLMs.

To enable streaming, simply chain the stream() method after constructing your prompt:

@GetMapping(value = "/ai-stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
Flux<String> streamResponse(@RequestParam String query) {
    return chatClient.prompt()
            .user(query)
            .stream()
            .content();
}

Here, the return type shifts from CallResponseSpec (blocking) to StreamResponseSpec, and the output becomes a Flux<String> — a reactive stream that emits text chunks as they arrive from the model.

Understanding Flux and Reactive Streams

Flux is a core type in Project Reactor, representing a stream of 0 to N items emitted asynchronously. Unlike Mono (which emits at most one value), Flux is ideal for continuous data flows — exactly what streaming LLM responses require.

Spring WebFlux leverages non-blocking I/O via Netty or Servlet 3.1+ containers, allowing a single thread to handle thousands of concurrent connections. Instead of blocking while waiting for an external API response, the thread is released back to the pool, and processing resumes only when data becomes available.

Behind the Scenes: How Streaming Works in Spring AI

The streaming pipeline begins with chatClient.stream().content(), which internally invokes:

public Flux<String> content() {
    return doGetFluxChatResponse(this.request)
            .map(response -> {
                if (response.getResult() == null ||
                    response.getResult().getOutput() == null ||
                    response.getResult().getOutput().getContent() == null) {
                    return "";
                }
                return response.getResult().getOutput().getContent();
            })
            .filter(chunk -> !chunk.isBlank());
}

This maps each ChatResponse to its text payload and filters out empty or null segments. The real magic lies in doGetFluxChatResponse, which delegates to the underlying model:

private Flux<ChatResponse> doGetFluxChatResponse(DefaultChatClientRequestSpec request) {
    Flux<OpenAiApi.ChatCompletionChunk> chunkStream = chatModel.stream(prompt);
    return new MessageAggregator().aggregate(chunkStream, observationContext::setResponse);
}

Here, chatModel.stream(prompt) initiates the connection to the LLM provider — tpyically OpenAI’s streaming endpoint. The implemantation uses WebClient to make a non-blocking HTTP POST request:

public Flux<ChatCompletionChunk> chatCompletionStream(
        ChatCompletionRequest request,
        MultiValueMap<String, String> headers) {

    Assert.notNull(request, "Request must not be null");
    Assert.isTrue(request.isStream(), "Stream must be enabled in request");

    return webClient.post()
            .uri(completionsPath)
            .headers(h -> h.addAll(headers))
            .bodyValue(request)
            .retrieve()
            .bodyToFlux(String.class)
            .takeUntil(SSE_DONE_PREDICATE)           // Stop when [DONE] is received
            .filter(SSE_DONE_PREDICATE.negate())      // Exclude [DONE] marker
            .map(content -> JsonUtils.fromJson(content, ChatCompletionChunk.class))
            .onErrorResume(ex -> Flux.error(new AiException("Stream failed", ex)));
}

This code:

  • Sends a POST request to the LLM’s streaming endpoint with stream=true
  • Uses bodyToFlux(String.class) to consume the server-sent events (SSE) as a stream of text chunks
  • Filters out the termination signal [DONE] using a predicate
  • Parses each line as a JSON-encoded ChatCompletionChunk object
  • Handles errors gracefully without terminating the entire stream

Each chunk contains a partial response — often a single word or phrase — which is aggregated and emitted as a continuous string stream to the client. This enables browsers or mobile clients to render text incrementally, mimicking human-like typing behavior.

Crucial, no thread is blocked during the wait for the next chunk. The underlying Netty event loop handles I/O asynchronously, and Reactor’s backpressure mechanism ensures consumers aren’t overwhelmed by rapid emissions.

Tags: SpringAI WebFlux ReactiveProgramming Flux OpenAIStreaming

Posted on Thu, 14 May 2026 16:11:44 +0000 by Dixen