- Notifications
You must be signed in to change notification settings - Fork 4
Streaming
Stream LLM responses in real-time as they're generated, reducing perceived latency for users.
class StreamingAgent < ApplicationAgent model "gpt-4o" streaming true # Enable streaming for this agent user "{prompt}" end# config/initializers/ruby_llm_agents.rb RubyLLM::Agents.configure do |config| config.default_streaming = true endProcess chunks as they arrive:
StreamingAgent.call(user: "Write a story") do |chunk| print chunk.content # chunk is a RubyLLM::Chunk object endOutput appears progressively:
Once... upon... a... time... For more explicit streaming, use the .stream() class method which forces streaming regardless of class settings:
result = MyAgent.stream(user: "Write a story") do |chunk| print chunk.content end # Access result metadata after streaming puts "Tokens: #{result.total_tokens}" puts "TTFT: #{result.time_to_first_token_ms}ms"This method:
- Forces streaming even if
streaming falseis set at class level - Requires a block (raises
ArgumentErrorif none provided) - Returns a
Resultobject with full metadata
When streaming completes, the returned Result contains streaming-specific metadata:
result = StreamingAgent.call(user: "test") do |chunk| print chunk.content end result.streaming? # => true result.time_to_first_token_ms # => 245 (ms until first chunk arrived) result.duration_ms # => 2500 (total execution time)class StreamingController < ApplicationController include ActionController::Live def stream_response response.headers['Content-Type'] = 'text/event-stream' response.headers['Cache-Control'] = 'no-cache' response.headers['X-Accel-Buffering'] = 'no' # Disable nginx buffering StreamingAgent.call(user: params[:prompt]) do |chunk| response.stream.write "data: #{chunk.to_json}\n\n" end response.stream.write "data: [DONE]\n\n" rescue ActionController::Live::ClientDisconnected # Client disconnected, clean up ensure response.stream.close end endconst eventSource = new EventSource('/stream?prompt=' + encodeURIComponent(prompt)); eventSource.onmessage = (event) => { if (event.data === '[DONE]') { eventSource.close(); return; } const chunk = JSON.parse(event.data); document.getElementById('output').textContent += chunk; }; eventSource.onerror = () => { eventSource.close(); };class ChatController < ApplicationController def create respond_to do |format| format.turbo_stream do StreamingAgent.call(user: params[:message]) do |chunk| Turbo::StreamsChannel.broadcast_append_to( "chat_#{params[:chat_id]}", target: "messages", partial: "messages/chunk", locals: { content: chunk } ) end end end end end<%= turbo_stream_from "chat_#{@chat.id}" %> <div id="messages"></div>Streaming executions track latency metrics:
# After streaming completes execution = RubyLLM::Agents::Execution.last execution.streaming? # => true execution.time_to_first_token_ms # => 245 (ms until first chunk) execution.duration_ms # => 2500 (total time)# Average TTFT for streaming agents RubyLLM::Agents::Execution.today.avg_time_to_first_token # => 312Note:
time_to_first_token_msis stored in themetadataJSON column, not as a direct SQL column. Use theavg_time_to_first_tokenanalytics method for aggregation, or access it on individual instances viaexecution.time_to_first_token_ms.
When using schemas, the full response is still validated:
class StructuredStreamingAgent < ApplicationAgent model "gpt-4o" streaming true user "Write about {topic}" def schema @schema ||= RubyLLM::Schema.create do string :title string :content end end end # Stream the raw text StructuredStreamingAgent.call(topic: "AI") do |chunk| print chunk # Raw JSON chunks end # Result is parsed and validated at the endImportant: Streaming responses are not cached by design, as caching would defeat the purpose of real-time streaming.
class MyAgent < ApplicationAgent streaming true cache_for 1.hour # Cache is ignored when streaming endIf you need caching with streaming-like UX, consider:
- Cache the full response
- Simulate streaming on the client side
begin StreamingAgent.call(user: "test") do |chunk| print chunk end rescue Timeout::Error puts "\n[Stream timed out]" rescue => e puts "\n[Stream error: #{e.message}]" endFor long-running streams, use ActionCable:
class StreamingJob < ApplicationJob def perform(prompt, channel_id) StreamingAgent.call(user: prompt) do |chunk| ActionCable.server.broadcast( channel_id, { type: 'chunk', content: chunk } ) end ActionCable.server.broadcast( channel_id, { type: 'complete' } ) end endStreaming is most beneficial for:
- Long-form content generation
- Conversational interfaces
- Real-time transcription/translation
def stream_response StreamingAgent.call(user: params[:prompt]) do |chunk| break if response.stream.closed? response.stream.write "data: #{chunk.to_json}\n\n" end ensure response.stream.close endclass LongFormAgent < ApplicationAgent streaming true timeout 180 # 3 minutes for long content endTrack time-to-first-token to ensure good UX:
# Alert if TTFT is too high if execution.time_to_first_token_ms > 1000 Rails.logger.warn("High TTFT: #{execution.time_to_first_token_ms}ms") end- Agent DSL - Configuration options
- Execution Tracking - TTFT analytics
- Dashboard - Monitoring streaming metrics