Documentation Index Fetch the complete documentation index at: https://cognis.vasanth.xyz/llms.txt
Use this file to discover all available pages before exploring further.
Modern LLM UIs expect tokens to appear as the model generates them. This pattern wires an axum (or actix, or warp, or hyper) endpoint that runs an agent and streams structured events out as Server-Sent Events. The shape works for any frontend that can read SSE.
What you’ll build
A POST endpoint at /chat that takes a user message, runs an agent, and streams tokens, tool starts, and tool ends back to the client until the agent finishes.
How it works
agent.stream_events(...) turns the agent into an event stream — same Runnable contract everywhere else uses.
An axum handler maps each Event to an SSE frame.
The frontend reads the SSE stream and renders tokens as they arrive, plus a sidebar of tool events.
The handler
use std :: sync :: Arc ;
use axum :: {
extract :: State ,
response :: sse :: { Event as SseEvent , Sse },
routing :: post,
Json , Router ,
};
use futures :: stream :: { Stream , StreamExt };
use serde :: Deserialize ;
use cognis :: prelude ::* ;
#[derive( Clone )]
struct AppState {
agent : Arc < tokio :: sync :: Mutex < cognis :: Agent >>,
}
#[derive( Deserialize )]
struct ChatRequest { message : String }
async fn chat_handler (
State ( state ) : State < AppState >,
Json ( req ) : Json < ChatRequest >,
) -> Sse < impl Stream < Item = std :: result :: Result < SseEvent , std :: convert :: Infallible >>> {
let mut agent = state . agent . lock () . await ;
let events = agent
. stream_events ( Message :: human ( req . message), RunnableConfig :: default ())
. await
. expect ( "stream_events" );
let stream = events . map ( | ev : Event | {
// Map every Cognis Event to an SSE frame.
let frame = match ev {
Event :: OnLlmToken { token , .. } => SseEvent :: default () . event ( "token" ) . data ( token ),
Event :: OnToolStart { tool , args , .. } => SseEvent :: default ()
. event ( "tool_start" )
. json_data ( serde_json :: json! ({ "tool" : tool , "args" : args }))
. unwrap (),
Event :: OnToolEnd { tool , result , .. } => SseEvent :: default ()
. event ( "tool_end" )
. json_data ( serde_json :: json! ({ "tool" : tool , "result" : result }))
. unwrap (),
Event :: OnEnd { output , .. } => SseEvent :: default () . event ( "done" ) . json_data ( output ) . unwrap (),
Event :: OnError { error , .. } => SseEvent :: default () . event ( "error" ) . data ( error ),
_ => SseEvent :: default () . event ( "ping" ) . data ( "" ),
};
Ok ( frame )
});
Sse :: new ( stream )
}
#[tokio :: main]
async fn main () -> Result <()> {
let agent = build_my_agent () ? ;
let state = AppState { agent : Arc :: new ( tokio :: sync :: Mutex :: new ( agent )) };
let app = Router :: new () . route ( "/chat" , post ( chat_handler )) . with_state ( state );
axum :: serve ( tokio :: net :: TcpListener :: bind ( "127.0.0.1:3000" ) . await ? , app ) . await ? ;
Ok (())
}
Add axum, tokio, serde, and serde_json to your Cargo.toml.
The frontend
const res = await fetch ( "/chat" , {
method: "POST" ,
headers: { "Content-Type" : "application/json" }, // Axum's Json extractor checks this
body: JSON . stringify ({ message: input . value }),
});
// Use any SSE-aware reader. EventSource doesn't support POST,
// so use a streaming fetch:
const reader = res . body . getReader ();
const decoder = new TextDecoder ();
let buf = "" ;
while ( true ) {
const { value , done } = await reader . read ();
if ( done ) break ;
buf += decoder . decode ( value , { stream: true });
// Parse `event: …\ndata: …\n\n` frames. SSE allows multiple data: lines per
// frame; concatenate them with \n.
const frames = buf . split ( " \n\n " );
buf = frames . pop ();
for ( const frame of frames ) {
let eventType = "message" ;
const dataLines = [];
for ( const line of frame . split ( " \n " )) {
if ( line . startsWith ( "event: " )) eventType = line . slice ( 7 );
else if ( line . startsWith ( "data: " )) dataLines . push ( line . slice ( 6 ));
}
const data = dataLines . join ( " \n " );
if ( eventType === "token" ) output . append ( data );
else if ( eventType === "tool_start" ) sidebar . add ( JSON . parse ( data ));
// …
}
}
How it works
stream_events starts the agent immediately and returns a stream of events. The first frames arrive before the agent finishes.
Per-message agent ownership. The handler locks the agent for one request — fine for low-volume; for high-throughput, build a fresh agent per request (cheap; just a builder pass).
Backpressure is real. If the client is slow, the channel fills and the agent eventually blocks. SSE typically streams fast enough; for slow clients, use a bounded channel and drop on overflow.
Errors are events. An OnError frame ends the stream; the frontend should display it instead of treating it as a stuck connection.
Variations
Variation What changes WebSockets Same logic; emit JSON-encoded frames over a WebSocket message instead of SSE. Server-side filtering If your frontend only renders tokens, drop everything except OnLlmToken before yielding. Multi-tenant Add a thread_id header → pass to RunnableConfig → wire a Checkpointer for per-user state. See Stateful chat . Cancellation Pass a CancellationToken on the config; cancel it when the client disconnects.
Pair with tracing
Wire cognis-trace and the same events stream into Langfuse — see Trace with Langfuse . Now you have a UI showing tokens to the user and a trace dashboard showing every run.
See also
Building agents → Streaming The Runnable-level streaming surface.
Graph workflows → Streaming Filter and custom event channels.
Patterns → Stateful chat Add memory and persistence.