Skip to main content

Documentation Index

Fetch the complete documentation index at: https://cognis.vasanth.xyz/llms.txt

Use this file to discover all available pages before exploring further.

Every piece of Cognis — prompts, models, parsers, tools, retrievers, agents, compiled graphs — implements the same trait: Runnable<I, O>. If you’ve used pipes in a shell, you already know the mental model. The difference is that the types flow through the composition, so the compiler catches plumbing mistakes before you run anything.

What it is

pub trait Runnable<I, O>: Send + Sync {
    async fn invoke(&self, input: I, config: RunnableConfig) -> Result<O>;
    // batch, stream, stream_events, name, schemas — all provided defaults
}
Implement invoke and you get batch (concurrent multi-input), stream (per-output stream), stream_events (lifecycle events), and a default name() for free.

Why typed

In Python, every Runnable shrugs and types its input/output as Any. In Cognis, a model is Runnable<Vec<Message>, Message>. A parser is Runnable<Message, Recipe>. Pipe them and the compiler refuses anything that doesn’t line up:
use cognis::prelude::*;

// Type flows: String → Vec<Message> → Message → Recipe
let chain = prompt.pipe(model).pipe(parser);
let recipe: Recipe = chain.invoke("scrambled eggs".into(), cfg).await?;
You don’t reach for serde_json::Value until something actually serializes — usually at a system boundary like an HTTP response or a checkpoint write. Inside your composition, types are real.

When to care

  • You’re building a chain by hand and want compile-time guarantees.
  • You’re writing a custom primitive (a fancy retriever, a domain-specific parser) and want everything else to compose with it for free.
  • You want consistent observability — stream_events and observers work on any Runnable.
If you’re using a high-level construct like AgentBuilder, you don’t usually instantiate Runnable directly. But the agent itself, the tools it calls, and the LLM client it wraps are all Runnable underneath, which is why the same wrappers (retry, timeout, fallback) work on all of them.

Quick example

A custom Runnable that doubles a number:
use async_trait::async_trait;
use cognis::prelude::*;
use futures::StreamExt;

struct Doubler;

#[async_trait]
impl Runnable<u32, u32> for Doubler {
    async fn invoke(&self, input: u32, _: RunnableConfig) -> Result<u32> {
        Ok(input * 2)
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let r = Doubler;

    // Single shot.
    println!("{}", r.invoke(5, RunnableConfig::default()).await?);

    // Concurrent batch — honors RunnableConfig.max_concurrency.
    let v = r.batch(vec![1, 2, 3, 4], RunnableConfig::default()).await?;

    // Default stream emits one item.
    let mut s = r.stream(7, RunnableConfig::default()).await?;
    while let Some(item) = s.next().await {
        println!("{}", item?);
    }
    Ok(())
}
Source: examples/v2/01_hello_runnable.rs.

Composition

Once you have a Runnable, compose it three ways:
use cognis::prelude::*;

let chain = a.pipe(b).pipe(c);
let out = chain.invoke(input, cfg).await?;
Branching (Branch) and parallel fan-out (Parallel) live in cognis_core::compose with the same shape — see Reference → cognis-core.

Wrappers

Cross-cutting behavior comes through RunnableExt, in scope via the prelude:
use std::time::Duration;
use cognis::prelude::*;

let resilient = client
    .with_max_retries(3)
    .with_timeout(Duration::from_secs(30));
MethodEffect
pipe(next)Sequential composition.
with_retry(policy) / with_max_retries(n)Retry on Err.
with_timeout(Duration)Bound a single invoke.
with_fallback(other)Try a backup on error.
with_memory_cache(key_fn)Hash-keyed in-memory cache.
each()Apply per-element to a Vec<I>.

Streaming events

Every Runnable can emit a structured event stream — useful for trace UIs, progress bars, and the observability stack.
use cognis::prelude::*;
use futures::StreamExt;

let mut events = chain.stream_events(input, RunnableConfig::default()).await?;
while let Some(ev) = events.next().await {
    match ev {
        Event::OnLlmToken { token, .. } => print!("{token}"),
        Event::OnToolStart { tool, args, .. } => println!("[tool] {tool}({args})"),
        Event::OnError { error, .. } => eprintln!("[err] {error}"),
        _ => {}
    }
}
The full enum: OnStart, OnEnd, OnError, OnNodeStart, OnNodeEnd, OnLlmToken, OnToolStart, OnToolEnd, OnCheckpoint, Custom.

How it works

  • invoke is the only required method. Everything else has a default that calls it.
  • stream_events is the source of trace data. Observers attached via RunnableConfig::with_observer see every event.
  • Composition returns concrete types. pipe(a, b) is Pipe<A, B, …>, not a trait object — no boxing in the hot path.
  • Wrappers are wrappers. with_retry returns Retry<Self, …>, which is itself a Runnable. They compose.

See also

Messages and content

What flows through chat-shaped Runnables.

Building agents → Streaming

stream for tokens, stream_events for structure.

Reference → cognis-core

The full method list and provided defaults.