Skip to main content

Documentation Index

Fetch the complete documentation index at: https://cognis.vasanth.xyz/llms.txt

Use this file to discover all available pages before exploring further.

A graph that runs in one shot tells you nothing until it finishes. Streaming surfaces what’s happening as it happens — node started, node ended, LLM token, custom progress signal. Cognis exposes this through the same Event-stream contract used everywhere else, plus a graph-specific stream_mode that filters down to what you care about.

The mental model

Three knobs:
  • stream(input, cfg) — the Runnable default. Emits the final state once.
  • stream_events(input, cfg) — every Event from the run, in order. Use when you want everything.
  • stream_mode(input, modes, cfg) — filter events by StreamMode. Use when you want specific channels (deltas, tokens, checkpoints).

stream_mode

use cognis::prelude::*;
use cognis_graph::{StreamMode, StreamModes};
use futures::StreamExt;

let mut events = graph
    .stream_mode(
        initial,
        StreamModes::only(StreamMode::Updates),  // per-node deltas
        RunnableConfig::default(),
    )
    .await?;

while let Some(ev) = events.next().await {
    if let Event::OnNodeEnd { node, output, .. } = ev {
        println!("node {node} → {output}");
    }
}
StreamModeWhat it emits
ValuesWhole state at the end (OnEnd).
UpdatesPer-node deltas (OnNodeEnd).
MessagesLLM tokens (OnLlmToken), tool starts/ends.
TasksNode-start signals (OnNodeStart).
CheckpointsEach persisted snapshot (OnCheckpoint).
DebugEverything.
CustomOnly Event::Custom payloads — node-emitted progress.
Combine modes:
let modes = StreamModes::default()
    .push(StreamMode::Updates)
    .push(StreamMode::Messages);

Custom events from a node

Inside a node_fn, the NodeCtx argument exposes a custom-event channel:
let plan = node_fn::<S, _, _>("plan", |state, ctx| async move {
    ctx.write_custom("progress", serde_json::json!({"phase": "planning"}));
    // …
    Ok(NodeOut { update: SUpdate::default(), goto: Goto::node("execute") })
});
Subscribe with StreamMode::Custom to receive only these:
let mut events = graph
    .stream_mode(initial, StreamModes::only(StreamMode::Custom), cfg)
    .await?;
while let Some(Event::Custom { kind, payload, .. }) = events.next().await {
    match kind.as_str() {
        "progress" => update_ui(payload),
        _ => {}
    }
}
This is how you wire a progress bar without coupling the node to your UI’s event format.

Streaming inside an agent

Agent::stream_events works the same way — the events come out nested under the agent’s run, with node-start / node-end events for the agent’s internal model / tools nodes plus the standard OnLlmToken / OnToolStart / OnToolEnd family.

How it works

  • Streaming is just an observer dressed up. Internally, stream_mode attaches an observer that pushes events into a channel. Same observer trait, different sink.
  • Filtering is by mode + variant. StreamModes::matches is just match over the event enum.
  • Backpressure is real. If the consumer is slow, the channel fills up. The engine eventually waits — make sure your consumer drains.
  • stream_events does not start the run; it returns the stream and starts the run in the background. The first event may arrive before the second await.

See also

Building agents → Streaming

The agent-level surface.

Observability → Callbacks

Same events, attached to a long-lived observer.

Patterns → Streaming UI

A complete server-side streaming endpoint.