Skip to content

Streaming

adham90 edited this page Feb 16, 2026 · 6 revisions

Streaming

Stream LLM responses in real-time as they're generated, reducing perceived latency for users.

Enabling Streaming

Per-Agent

class StreamingAgent < ApplicationAgent model "gpt-4o" streaming true # Enable streaming for this agent user "{prompt}" end

Global Default

# config/initializers/ruby_llm_agents.rb RubyLLM::Agents.configure do |config| config.default_streaming = true end

Using Streaming with a Block

Process chunks as they arrive:

StreamingAgent.call(user: "Write a story") do |chunk| print chunk.content # chunk is a RubyLLM::Chunk object end

Output appears progressively:

Once... upon... a... time... 

Explicit Stream Method

For more explicit streaming, use the .stream() class method which forces streaming regardless of class settings:

result = MyAgent.stream(user: "Write a story") do |chunk| print chunk.content end # Access result metadata after streaming puts "Tokens: #{result.total_tokens}" puts "TTFT: #{result.time_to_first_token_ms}ms"

This method:

  • Forces streaming even if streaming false is set at class level
  • Requires a block (raises ArgumentError if none provided)
  • Returns a Result object with full metadata

Streaming Result Metadata

When streaming completes, the returned Result contains streaming-specific metadata:

result = StreamingAgent.call(user: "test") do |chunk| print chunk.content end result.streaming? # => true result.time_to_first_token_ms # => 245 (ms until first chunk arrived) result.duration_ms # => 2500 (total execution time)

HTTP Streaming

Server-Sent Events (SSE)

class StreamingController < ApplicationController include ActionController::Live def stream_response response.headers['Content-Type'] = 'text/event-stream' response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering StreamingAgent.call(user: params[:prompt]) do |chunk| response.stream.write "data: #{chunk.to_json}\n\n" end response.stream.write "data: [DONE]\n\n" rescue ActionController::Live::ClientDisconnected # Client disconnected, clean up ensure response.stream.close end end

Client-Side JavaScript

const eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt)); eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } const chunk = JSON.parse(event.data); document.getElementById('output').textContent += chunk; }; eventSource.onerror = () => { eventSource.close(); };

Turbo Streams Integration

Controller

class ChatController < ApplicationController def create respond_to do |format| format.turbo_stream do StreamingAgent.call(user: params[:message]) do |chunk| Turbo::StreamsChannel.broadcast_append_to( "chat_#{params[:chat_id]}", target: "messages", partial: "messages/chunk", locals: { content: chunk } ) end end end end end

View

<%= turbo_stream_from "chat_#{@chat.id}" %> <div id="messages"></div>

Time-to-First-Token (TTFT) Tracking

Streaming executions track latency metrics:

# After streaming completes execution = RubyLLM::Agents::Execution.last execution.streaming? # => true execution.time_to_first_token_ms # => 245 (ms until first chunk) execution.duration_ms # => 2500 (total time)

Analytics

# Average TTFT for streaming agents RubyLLM::Agents::Execution.today.avg_time_to_first_token # => 312

Note: time_to_first_token_ms is stored in the metadata JSON column, not as a direct SQL column. Use the avg_time_to_first_token analytics method for aggregation, or access it on individual instances via execution.time_to_first_token_ms.

Streaming with Structured Output

When using schemas, the full response is still validated:

class StructuredStreamingAgent < ApplicationAgent model "gpt-4o" streaming true user "Write about {topic}" def schema @schema ||= RubyLLM::Schema.create do string :title string :content end end end # Stream the raw text StructuredStreamingAgent.call(topic: "AI") do |chunk| print chunk # Raw JSON chunks end # Result is parsed and validated at the end

Caching and Streaming

Important: Streaming responses are not cached by design, as caching would defeat the purpose of real-time streaming.

class MyAgent < ApplicationAgent streaming true cache_for 1.hour # Cache is ignored when streaming end

If you need caching with streaming-like UX, consider:

  1. Cache the full response
  2. Simulate streaming on the client side

Error Handling

begin StreamingAgent.call(user: "test") do |chunk| print chunk end rescue Timeout::Error puts "\n[Stream timed out]" rescue => e puts "\n[Stream error: #{e.message}]" end

Streaming in Background Jobs

For long-running streams, use ActionCable:

class StreamingJob < ApplicationJob def perform(prompt, channel_id) StreamingAgent.call(user: prompt) do |chunk| ActionCable.server.broadcast( channel_id, { type: 'chunk', content: chunk } ) end ActionCable.server.broadcast( channel_id, { type: 'complete' } ) end end

Best Practices

Use for Long Responses

Streaming is most beneficial for:

  • Long-form content generation
  • Conversational interfaces
  • Real-time transcription/translation

Handle Disconnections

def stream_response StreamingAgent.call(user: params[:prompt]) do |chunk| break if response.stream.closed? response.stream.write "data: #{chunk.to_json}\n\n" end ensure response.stream.close end

Set Appropriate Timeouts

class LongFormAgent < ApplicationAgent streaming true timeout 180 # 3 minutes for long content end

Monitor TTFT

Track time-to-first-token to ensure good UX:

# Alert if TTFT is too high if execution.time_to_first_token_ms > 1000 Rails.logger.warn("High TTFT: #{execution.time_to_first_token_ms}ms") end

Related Pages

Clone this wiki locally