Skip to main content

Documentation Index

Fetch the complete documentation index at: https://cognis.vasanth.xyz/llms.txt

Use this file to discover all available pages before exploring further.

Indexing turns documents into a searchable vector store. Doing it once is easy; doing it over and over — when source files change, when you edit chunking strategy, when you add new docs — is where the cost lives. Cognis’ IndexingPipeline makes incremental indexing the default: tell it which docs you care about, give it a way to fingerprint them, and it only re-embeds what changed.

The shape

Loader → Splitter → (RecordManager: which keys changed?) → Embedder → VectorStore
You provide the loader (where docs come from), splitter (how to chunk), record manager (where to remember fingerprints), and store (where vectors live). The pipeline does the work.

Quick example

use std::sync::{Arc, Mutex};
use tokio::sync::RwLock;
use async_trait::async_trait;
use cognis::prelude::*;
use cognis_rag::loaders::{DocumentLoader, DocumentStream};
use cognis_rag::{
    CharacterSplitter, Document, Embeddings, FakeEmbeddings, InMemoryRecordManager,
    InMemoryVectorStore, IndexingPipeline,
};
use futures::stream;

struct VecLoader(Arc<Mutex<Vec<Document>>>);

#[async_trait]
impl DocumentLoader for VecLoader {
    async fn load(&self) -> Result<DocumentStream> {
        let v = self.0.lock().unwrap().clone();
        Ok(Box::pin(stream::iter(v.into_iter().map(Ok))))
    }
}

#[tokio::main]
async fn main() -> Result<()> {
    let emb: Arc<dyn Embeddings> = Arc::new(FakeEmbeddings::new(8));
    let store = Arc::new(RwLock::new(InMemoryVectorStore::new(emb)));
    let manager = InMemoryRecordManager::default();
    let docs = Arc::new(Mutex::new(vec![
        Document::new("doc one v1").with_id("a"),
        Document::new("doc two v1").with_id("b"),
    ]));

    let pipeline = IndexingPipeline::new(
        VecLoader(docs.clone()),
        CharacterSplitter::new().with_chunk_size(200),
        store.clone(),
    );

    // First run: everything is new.
    let r1 = pipeline.run_incremental(&manager, "g1", |d| d.id.clone()).await?;
    println!("added={} changed={} unchanged={} deleted={}",
        r1.added, r1.changed, r1.unchanged, r1.deleted);

    // Edit doc B, add doc C.
    *docs.lock().unwrap() = vec![
        Document::new("doc one v1").with_id("a"),                 // unchanged
        Document::new("doc two v2 changed").with_id("b"),         // changed
        Document::new("doc three new").with_id("c"),              // added
    ];

    let r2 = pipeline.run_incremental(&manager, "g1", |d| d.id.clone()).await?;
    println!("added={} changed={} unchanged={} deleted={}",
        r2.added, r2.changed, r2.unchanged, r2.deleted);
    Ok(())
}
Source: examples/retrieval/indexing_rag.rs. The output of the second run reports only what changed — changed=1, added=1, unchanged=1. Doc A wasn’t re-embedded.

Incremental vs full

Two run methods, two trade-offs:
MethodBehaviorWhen
pipeline.run().await?Re-index everything.Initial population. Splitter or embedder change.
pipeline.run_incremental(record_manager, group, key_fn).await?Only re-embed new or changed docs.Steady state. Most production loops.
group is a namespace for the record manager — you can keep multiple indices in one manager (e.g., "docs", "code", "tickets"). key_fn returns the document key the record manager uses for fingerprinting. The simplest choice: |d| d.id.clone(). For sources without stable ids, hash the content.

Record managers

The record manager is the bookkeeping layer. It stores fingerprints (key + content hash) so the pipeline can spot changes without diffing the vector store.
ImplementationNotes
InMemoryRecordManager::default()Lives in process. Lost on restart.
(custom)Implement RecordManager for SQLite, Postgres, Redis, S3.
For long-running services, plug in a persistent record manager so a restart doesn’t trigger full re-indexing.

How “changed” is detected

The pipeline computes a stable content fingerprint per doc. Same key + same content hash = unchanged. Different content for the same key = changed (re-embed and replace). New key = added. Key seen previously but not in this load = deleted (removed from the store). Fingerprint stability matters: since v0.3.0 the algorithm is locked to be deterministic across Rust releases (see PR #26). You won’t get spurious re-indexing from compiler upgrades.

How it works

  • Splitting happens before fingerprinting. Each chunk inherits the parent doc’s key and is fingerprinted alongside it.
  • The store is updated atomically per doc. Old chunks for a changed doc are deleted before new ones are added, so there’s no window where the old and new versions are both queryable.
  • Errors per doc don’t poison the run. A failing embedder call for one document is reported in the result but doesn’t prevent others from indexing.
  • Concurrency follows RunnableConfig::max_concurrency. Tune for your embedder’s rate limits.

See also

Documents and splitters

What goes into the pipeline.

Embeddings and vector stores

What comes out.

Patterns → Code Q&A

A worked indexing-then-retrieval flow.