Skip to main content
Streaming gives users feedback as the model thinks. Cognis exposes two streams on every Runnable, with different jobs.
StreamItemUse for
stream(input, cfg)O (chunks of the output)Token-by-token UI, progressive rendering.
stream_events(input, cfg)EventTrace UIs, progress bars, observability pipelines.
If you want to display tokens as they arrive, use stream. If you want to show a tree of “model thinking → calling tool → got result”, use stream_events.

Quick example — token streaming from the LLM

use cognis::prelude::*;
use futures::StreamExt;

let client = Client::from_env()?;
let mut s = client.stream(vec![Message::human("Tell me a one-line joke.")]).await?;
while let Some(chunk) = s.next().await {
    print!("{}", chunk?.content);
}
println!();
StreamChunk carries the delta (content: String) plus optional usage and finish-reason fields on the last chunk. For tool-using replies, the final chunk holds the assembled tool_calls. Source: examples/v2/04_streaming_chat.rs.

Quick example — structured events

stream_events works on any Runnable — a chain, a model, a graph, an agent.
use cognis::prelude::*;
use futures::StreamExt;

let mut events = chain.stream_events(input, RunnableConfig::default()).await?;
while let Some(ev) = events.next().await {
    match ev {
        Event::OnLlmToken { token, .. } => print!("{token}"),
        Event::OnToolStart { tool, args, .. } => println!("\n[tool] {tool} {args}"),
        Event::OnToolEnd { tool, .. } => println!("[done {tool}]"),
        Event::OnNodeStart { node, step, .. } => println!("[node {step}] {node}"),
        Event::OnError { error, .. } => eprintln!("[err] {error}"),
        _ => {}
    }
}
Full event variants:
VariantFields
OnStart / OnEndrunnable, run_id, input/output
OnErrorerror, run_id
OnNodeStart / OnNodeEndnode, step, run_id, output?
OnLlmTokentoken, run_id
OnToolStart / OnToolEndtool, args/result, run_id
OnCheckpointstep, run_id
Customkind, payload, run_id (user-emitted from a graph node)

Filtered graph streaming

For graphs, Cognis exposes stream_mode, which filters events down to a named subset:
use cognis::prelude::*;
use cognis_graph::{StreamMode, StreamModes};
use futures::StreamExt;

let mut events = graph
    .stream_mode(initial, StreamModes::only(StreamMode::Updates), RunnableConfig::default())
    .await?;
while let Some(ev) = events.next().await {
    println!("{:?}", ev);
}
StreamModeWhat it emits
ValuesWhole state at the end (OnEnd).
UpdatesPer-node deltas (OnNodeEnd).
MessagesLLM tokens, tool starts/ends.
TasksNode-start signals.
CheckpointsEach persisted snapshot.
DebugEverything.
CustomOnly Event::Custom payloads (emitted via NodeCtx::write_custom).
Combine multiple modes with StreamModes::default().push(StreamMode::Updates).push(StreamMode::Messages).

Streaming inside an agent

let mut events = agent.stream_events(Message::human("…"), cfg).await?;
Same surface — the agent is a Runnable, so stream_events produces a tree of model-token, tool-call, tool-result events nested under the agent’s run.

How it works

  • Streaming uses the same Observer pipe as static observers. The events flow through whatever observers you’ve attached and into the stream.
  • OnLlmToken is emitted by providers that support streaming. For providers that don’t, you’ll see one OnEnd with the full text.
  • Custom is for app-emitted progress. Inside a graph node, call ctx.write_custom("kind", payload) and your UI receives the event.
  • stream_events calls invoke under the hood for non-streaming Runnables. The default emits OnStart + OnEnd only — override on your custom Runnable when you can do better.

See also

Runnables → Streaming events

The lower-level shape.

Graph workflows → Streaming

Graph-specific filters and the Custom channel.

Patterns → Streaming UI

A complete server-side streaming endpoint.