Skip to main content

Documentation Index

Fetch the complete documentation index at: https://cognis.vasanth.xyz/llms.txt

Use this file to discover all available pages before exploring further.

Streaming gives users feedback as the model thinks. Cognis exposes two streams on every Runnable, with different jobs.
StreamItemUse for
stream(input, cfg)O (chunks of the output)Token-by-token UI, progressive rendering.
stream_events(input, cfg)EventTrace UIs, progress bars, observability pipelines.
If you want to display tokens as they arrive, use stream. If you want to show a tree of “model thinking → calling tool → got result”, use stream_events.

Quick example — token streaming from the LLM

use cognis::prelude::*;
use futures::StreamExt;

let client = Client::from_env()?;
let mut s = client.stream(vec![Message::human("Tell me a one-line joke.")]).await?;
while let Some(chunk) = s.next().await {
    print!("{}", chunk?.content);
}
println!();
StreamChunk carries the delta (content: String) plus optional usage and finish-reason fields on the last chunk. For tool-using replies, the final chunk holds the assembled tool_calls. Source: examples/v2/04_streaming_chat.rs.

Quick example — structured events

stream_events works on any Runnable — a chain, a model, a graph, an agent.
use cognis::prelude::*;
use futures::StreamExt;

let mut events = chain.stream_events(input, RunnableConfig::default()).await?;
while let Some(ev) = events.next().await {
    match ev {
        Event::OnLlmToken { token, .. } => print!("{token}"),
        Event::OnToolStart { tool, args, .. } => println!("\n[tool] {tool} {args}"),
        Event::OnToolEnd { tool, .. } => println!("[done {tool}]"),
        Event::OnNodeStart { node, step, .. } => println!("[node {step}] {node}"),
        Event::OnError { error, .. } => eprintln!("[err] {error}"),
        _ => {}
    }
}
Full event variants:
VariantFields
OnStart / OnEndrunnable, run_id, input/output
OnErrorerror, run_id
OnNodeStart / OnNodeEndnode, step, run_id, output?
OnLlmTokentoken, run_id
OnToolStart / OnToolEndtool, args/result, run_id
OnCheckpointstep, run_id
Customkind, payload, run_id (user-emitted from a graph node)

Filtered graph streaming

For graphs, Cognis exposes stream_mode, which filters events down to a named subset:
use cognis::prelude::*;
use cognis_graph::{StreamMode, StreamModes};
use futures::StreamExt;

let mut events = graph
    .stream_mode(initial, StreamModes::only(StreamMode::Updates), RunnableConfig::default())
    .await?;
while let Some(ev) = events.next().await {
    println!("{:?}", ev);
}
StreamModeWhat it emits
ValuesWhole state at the end (OnEnd).
UpdatesPer-node deltas (OnNodeEnd).
MessagesLLM tokens, tool starts/ends.
TasksNode-start signals.
CheckpointsEach persisted snapshot.
DebugEverything.
CustomOnly Event::Custom payloads (emitted via NodeCtx::write_custom).
Combine multiple modes with StreamModes::default().push(StreamMode::Updates).push(StreamMode::Messages).

Streaming inside an agent

let mut events = agent.stream_events(Message::human("…"), cfg).await?;
Same surface — the agent is a Runnable, so stream_events produces a tree of model-token, tool-call, tool-result events nested under the agent’s run.

How it works

  • Streaming uses the same Observer pipe as static observers. The events flow through whatever observers you’ve attached and into the stream.
  • OnLlmToken is emitted by providers that support streaming. For providers that don’t, you’ll see one OnEnd with the full text.
  • Custom is for app-emitted progress. Inside a graph node, call ctx.write_custom("kind", payload) and your UI receives the event.
  • stream_events calls invoke under the hood for non-streaming Runnables. The default emits OnStart + OnEnd only — override on your custom Runnable when you can do better.

See also

Runnables → Streaming events

The lower-level shape.

Graph workflows → Streaming

Graph-specific filters and the Custom channel.

Patterns → Streaming UI

A complete server-side streaming endpoint.