Skip to content
Get Started

Streaming

Streaming lets you process an LLM response incrementally as it is generated instead of waiting for the whole thing. It’s essential for responsive UIs and long-form output. Rig mirrors its non-streaming traits with streaming equivalents, all in the rig::streaming module.

The simplest case: prompt an agent and print tokens as they arrive. Instead of prompt(), call stream_prompt() (from the StreamingPrompt trait). An agent’s stream yields MultiTurnStreamItem values, which you match on to handle text deltas and the final response.

use futures::StreamExt;
use rig::agent::MultiTurnStreamItem;
use rig::client::{CompletionClient, ProviderClient};
use rig::providers::openai;
use rig::streaming::{StreamedAssistantContent, StreamingPrompt};
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
let openai = openai::Client::from_env()?;
let agent = openai
.agent("gpt-5.5")
.preamble("You are a storyteller.")
.temperature(0.9)
.build();
let mut stream = agent.stream_prompt("Tell me a short story about a robot.").await;
while let Some(item) = stream.next().await {
match item? {
MultiTurnStreamItem::StreamAssistantItem(StreamedAssistantContent::Text(text)) => {
print!("{}", text.text);
}
MultiTurnStreamItem::FinalResponse(_) => println!(),
_ => {}
}
}
Ok(())
}
Once, in a quiet workshop, a small robot named Bolt woke to the hum of morning
light. It had one task left unfinished: to water the single flower on the bench.
Reaching out with a careful claw, Bolt tipped the can — and for the first time,
watched a petal open just for it.

The streaming traits mirror the non-streaming completion traits one-to-one:

Non-StreamingStreamingDescription
PromptStreamingPromptOne-shot streaming prompt
ChatStreamingChatStreaming chat with history
CompletionStreamingCompletionLow-level streaming completion interface

Streaming with conversation history — same MultiTurnStreamItem stream as stream_prompt, plus the chat history:

use rig::streaming::StreamingChat;
let mut stream = agent.stream_chat("Continue the story", chat_history).await;

The low-level interface returns a request builder so you can customise the request before sending it:

use rig::streaming::StreamingCompletion;
let builder = agent.stream_completion("prompt", chat_history).await?;
let response = builder
.temperature(0.9)
.stream()
.await?;

MultiTurnStreamItem (in rig::agent) is what an agent’s stream_prompt / stream_chat yields across the multi-turn loop. Match StreamAssistantItem(...) to read per-token content deltas and FinalResponse(...) for the completed turn. Because the whole agent loop flows through this stream, you can observe tool calls and their results in real time.

StreamedAssistantContent (in rig::streaming) is a single piece of streamed assistant output. The main variants are a text delta (Text, whose text you read via text.text), a tool-call delta (partial tool name/arguments, streamed piece by piece), and a final usage event carrying token counts for the whole completion. Buffer tool-call deltas until the call is complete before executing the tool.

StreamingCompletionResponse is what the low-level stream_completion(...).stream() path returns. It wraps the inner stream of chunks and, once the stream has been fully consumed, exposes the aggregated message and the raw provider response. See the API docs for the exact fields.

For the common case of printing a stream to the terminal, Rig ships a helper:

use rig::agent::stream_to_stdout;
let mut stream = agent.stream_prompt("Hello!").await;
stream_to_stdout(&mut stream).await?;
Hello! How can I help you today?

stream_to_stdout prints text chunks as they arrive and ignores tool-call deltas, which usually aren’t meaningful to display directly.

PauseControl lets you pause and resume a streaming response — useful for user-controlled streaming in interactive apps:

use rig::streaming::PauseControl;
use std::sync::Arc;
let pause = Arc::new(PauseControl::new());
let pause_clone = Arc::clone(&pause);
// In another task:
pause_clone.pause();
// ...
pause_clone.resume();
  • Handle errors per chunk. Starting a stream (stream_prompt(...).await) always succeeds, but each item in it is a Result that can fail independently — match on item? rather than assuming the whole stream succeeds or fails atomically.
  • Apply backpressure. Use PauseControl or standard stream backpressure when the consumer can’t keep up with the producer.
  • Read usage at the end. The final usage event reports token counts for the entire completion, not per chunk.
  • Completions — the non-streaming completion traits
  • Agents — the agent system these streams come from