Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

naaf - Not Another Agent Framework

naaf is a strongly typed Rust library for building asynchronous LLM workflows out of retrying steps. It provides composable, type-safe building blocks for workflows that plan, validate, repair, and rerun.

Core Philosophy

Every workflow follows the core loop: run a task, validate the output, and if validation fails, repair the input and try again. Steps compose sequentially, in parallel, or as dynamic graphs — all with full tracing support.

Status

naaf is early-stage. APIs may change between releases.

Features

  • Type-safe workflows — Your Rust types flow through the entire pipeline. No stringly-typed dictionaries.
  • Composable steps — Sequential, parallel, and dynamic graph composition.
  • Built-in retry — Task-check-repair loop with configurable retry policies.
  • Observability — Full tracing support for debugging and monitoring.
  • Multiple backends — LLM integration (OpenAI, Anthropic), shell commands, and vector databases.

Crates

CrateDescription
naaf-coreCore traits, Step builder, Workflow runtime
naaf-llmLLM-backed adapters, tool calling
naaf-processShell-command adapters
naaf-qdrantQdrant vector database integration
naaf-knowledgeKnowledge orchestration
naaf-tuiTerminal UI for workflow observation
naaf-persistence-fsFilesystem checkpoint persistence
naaf-persistence-sqliteSQLite checkpoint persistence
naaf-cliCLI for knowledge base operations

Quick Example

#![allow(unused)]
fn main() {
use naaf_core::{Check, RetryPolicy, Step, Task};

// Define your domain types
struct GeneratePatch;
struct CargoTest;
struct RepairPatch;

// Build a step with validation and automatic repair
let step = Step::builder(GeneratePatch)
    .validate(CargoTest)
    .repair_with(RepairPatch)
    .retry_policy(RetryPolicy::new(3))
    .build();

// Run the step
let result = step.run(&runtime, prompt).await?;
}

Next Steps

Getting Started

This guide walks you through installing naaf and running your first workflow.

Installation

Add the crates you need to your Cargo.toml:

[dependencies]
naaf-core = "0.1.0"    # Core traits and Step builder
naaf-llm = "0.1.0"     # LLM-backed Task, Check, Materialiser, RepairPlanner
naaf-process = "0.1.0"  # Shell-command-backed adapters

The naaf-llm crate has an optional openai feature for the built-in OpenAI client:

naaf-llm = { version = "0.1.0", features = ["openai"] }

Your First Workflow

This example creates a step that generates a code patch, validates it with cargo test, and retries on failure.

Dependencies

[dependencies]
naaf-core = "0.1.0"
naaf-process = "0.1.0"
tokio = { version = "1", features = ["full"] }

Code

use naaf_core::{Check, RetryPolicy, Step, Task};
use naaf_process::ProcessAgent;
use serde::{Deserialize, Serialize};
use std::process::Stdio;
use tokio::process::Command;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct Patch(String);

#[derive(Debug, Clone, Serialize, Deserialize)]
struct PatchFindings {
    failed: Vec<String>,
}

struct GeneratePatch;

#[async_trait::async_trait]
impl Task for GeneratePatch {
    type Input = String;
    type Output = Patch;
    type Error = std::io::Error;

    async fn run(&self, _: &naaf_core::Runtime, input: &String) -> Result<Patch, Self::Error> {
        Ok(Patch(format!("// Generated patch for: {}\nfn solution() {{ }}\n", input)))
    }
}

struct ValidatePatch;

#[async_trait::async_trait]
impl Check for ValidatePatch {
    type Subject = Patch;
    type Findings = PatchFindings;
    type Error = std::io::Error;

    async fn check(
        &self,
        _: &naaf_core::Runtime,
        subject: &Patch,
    ) -> Result<Vec<Self::Findings>, Self::Error> {
        let output = Command::new("sh")
            .arg("-c")
            .arg(format!("echo '{}' | cargo check --message-format=json 2>&1 || true", subject.0))
            .stdout(Stdio::piped())
            .output()
            .await?;

        let output_str = String::from_utf8_lossy(&output.stdout);
        if output_str.contains("error") {
            Ok(vec![PatchFindings {
                failed: vec![output_str.to_string()],
            }])
        } else {
            Ok(vec![])
        }
    }
}

struct RepairPatch;

#[async_trait::async_trait]
impl naaf_core::RepairPlanner for RepairPatch {
    type Input = String;
    type Output = String;
    type Findings = PatchFindings;

    async fn repair(
        &self,
        _: &naaf_core::Runtime,
        attempts: &[naaf_core::Attempt<PatchFindings, Patch>],
    ) -> Result<String, Self::Error> {
        let last_input = attempts.last().map(|a| a.input()).unwrap_or("");
        let findings = attempts.last().map(|a| a.findings()).unwrap_or(&vec![]);

        Ok(format!("retry with: {} failures: {:?}", last_input, findings))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let runtime = naaf_core::Runtime::new();

    let step = Step::builder(GeneratePatch)
        .validate(ValidatePatch)
        .repair_with(RepairPatch)
        .retry_policy(RetryPolicy::new(3))
        .build();

    let result = step.run(&runtime, &"test input".to_string()).await;

    match result {
        Ok(output) => println!("Success: {:?}", output),
        Err(e) => println!("Failed: {:?}", e),
    }

    Ok(())
}

Running Examples

Each example is a standalone binary in the examples/ directory:

# Run the step-retry example
cargo run -p step-retry

# Run the materialiser example
cargo run -p materialiser

# Run the dynamic-workflow example
cargo run -p dynamic-workflow

Next Steps

Core Concepts

naaf is built around four core traits, all driven by a shared Runtime. These traits form the building blocks of every workflow.

The Four Traits

TraitPurposeKey Method
TaskProduces an artefact from inputrun(&self, runtime, input)
CheckValidates a subject, returns findingscheck(&self, runtime, subject)
MaterialiserTransforms output (often with side effects)materialise(&self, runtime, input)
RepairPlannerProduces the next input from failed attemptsrepair(&self, runtime, attempts)

Each trait is async, takes a runtime reference for capabilities, and returns a strongly typed result. No stringly-typed dictionaries — your Rust types flow through the entire pipeline.

Relationship Between Traits

Input → Task → Output → Materialiser → Subject → Check → Findings
                    ↑                              ↓
                    ←←←←←← RepairPlanner ←←←←←←←←←←←←

The core loop works as follows:

  1. A Task produces an Output
  2. A Materialiser optionally transforms the output into a Subject
  3. A Check validates the subject and returns Findings
  4. If findings are non-empty, a RepairPlanner can produce a new input for retry

Runtime

The Runtime provides shared capabilities to all traits:

#![allow(unused)]
fn main() {
use naaf_core::Runtime;

let runtime = Runtime::new();
}

You can extend the runtime with capabilities:

#![allow(unused)]
fn main() {
use naaf_llm::LlmClient;
use naaf_qdrant::QdrantClient;

// Extend with LLM and vector database capabilities
let runtime = Runtime::new()
    .with_llm(LlmClient::new())
    .with_vector_db(QdrantClient::new(url, api_key));
}

Guide to This Section

  • Task — Produces output from input
  • Check — Validates output, returns findings
  • Materialiser — Transforms output between stages
  • RepairPlanner — Plans retry input from failures
  • Step — Combines Task, Check, Materialiser, and RepairPlanner

Task

The Task trait produces an artefact from input. It’s the starting point of every workflow.

Definition

#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Task: Send + Sync {
    type Input;
    type Output;
    type Error;

    async fn run(&self, runtime: &Runtime, input: &Self::Input) -> Result<Self::Output, Self::Error>;
}
}

Parameters

  • Input — The type of data this task consumes
  • Output — The type of data this task produces
  • Error — The error type this task may return

Example

use naaf_core::{Runtime, Task};

struct GenerateCode;

#[async_trait::async_trait]
impl Task for GenerateCode {
    type Input = String;
    type Output = String;
    type Error = std::io::Error;

    async fn run(&self, _: &Runtime, input: &String) -> Result<String, std::io::Error> {
        Ok(format!("fn solution() {{ // {}\n    unimplemented!()\n}}", input))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let runtime = Runtime::new();
    let task = GenerateCode;

    let output = task.run(&runtime, &"add two numbers".to_string()).await?;
    println!("{}", output);

    Ok(())
}

Using Runtime Capabilities

The runtime provides access to shared capabilities:

#![allow(unused)]
fn main() {
use naaf_core::{Runtime, Task};
use naaf_llm::LlmClient;

struct LlmTask {
    prompt: String,
}

#[async_trait::async_trait]
impl Task for LlmTask {
    type Input = String;
    type Output = String;
    type Error = std::io::Error;

    async fn run(&self, runtime: &Runtime, input: &String) -> Result<String, std::io::Error> {
        let llm = runtime.llm().ok_or_else(|| std::io::Error::new(
            std::io::ErrorKind::NotFound,
            "LLM not configured"
        ))?;

        let full_prompt = format!("{}: {}", self.prompt, input);
        let response = llm.complete(&full_prompt).await?;
        Ok(response)
    }
}
}

Task Combinators

Tasks can be combined using the TaskExt extension trait:

#![allow(unused)]
fn main() {
use naaf_core::TaskExt;

// Wrap a task with observability
let observed = my_task.observed();

// Wrap with a custom name
let named = my_task.observed_as("generate_code");
}

See Also

  • Check — Validates task output
  • Materialiser — Transforms task output
  • Step — Combines Task with Check and RepairPlanner

Check

The Check trait validates a subject and returns findings. An empty findings list means the check passed.

Definition

#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Check: Send + Sync {
    type Subject;
    type Findings;
    type Error;

    async fn check(
        &self,
        runtime: &Runtime,
        subject: &Self::Subject,
    ) -> Result<Vec<Self::Findings>, Self::Error>;
}
}

Parameters

  • Subject — The type of data this check validates
  • Findings — The type of findings this check may return
  • Error — The error type this check may return

Key Concept: Empty Findings

An empty findings list (vec![]) means the subject passed validation. Any non-empty list indicates failure.

Example

#![allow(unused)]
fn main() {
use naaf_core::{Check, Runtime};
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
struct ValidationError {
    message: String,
    line: Option<u32>,
}

struct SyntaxCheck;

#[async_trait::async_trait]
impl Check for SyntaxCheck {
    type Subject = String;
    type Findings = ValidationError;
    type Error = std::io::Error;

    async fn check(
        &self,
        _: &Runtime,
        subject: &String,
    ) -> Result<Vec<ValidationError>, std::io::Error> {
        let mut findings = vec![];

        for (i, line) in subject.lines().enumerate() {
            if line.contains("unimplemented!()") && !line.starts_with("//") {
                findings.push(ValidationError {
                    message: "Unimplemented code found".to_string(),
                    line: Some(i as u32 + 1),
                });
            }
        }

        Ok(findings)
    }
}
}

Chaining Checks

Multiple checks form a validation pipeline:

#![allow(unused)]
fn main() {
let step = Step::builder(GenerateCode)
    .validate(SyntaxCheck)
    .validate(SecurityCheck)
    .validate(PerformanceCheck)
    .build();
}

Checks run in the order they are added. The step only passes if all checks return empty findings.

Using Findings in Repair

Findings feed directly into the repair process:

#![allow(unused)]
fn main() {
use naaf_core::RepairPlanner;

struct RepairCode;

#[async_trait::async_trait]
impl RepairPlanner for RepairCode {
    type Input = String;
    type Output = String;
    type Findings = ValidationError;

    async fn repair(
        &self,
        _: &Runtime,
        attempts: &[naaf_core::Attempt<ValidationError, String>],
    ) -> Result<String, Self::Error> {
        let findings = attempts.last()
            .and_then(|a| a.findings())
            .unwrap_or(&vec![]);

        let mut instructions = String::new();
        for finding in findings {
            instructions.push_str(&format!("// TODO: {}\n", finding.message));
        }

        Ok(format!("// Repair based on {} findings\n{}",
            findings.len(), instructions))
    }
}
}

See Also

  • Task — Produces the subject checked
  • Materialiser — Transforms output before checking
  • Recipe — Combines Task with Check and RepairPlanner

Materialiser

The Materialiser trait transforms task output into a different type, often with side effects. It’s positioned between the task and validation stages.

Definition

#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Materialiser: Send + Sync {
    type Input;
    type Output;
    type Error;

    async fn materialise(
        &self,
        runtime: &Runtime,
        input: &Self::Input,
    ) -> Result<Self::Output, Self::Error>;
}
}

Parameters

  • Input — The type this materialiser consumes (typically Task’s output)
  • Output — The type this materialiser produces (the Check’s subject)
  • Error — The error type this materialiser may return

When to Use Materialiser

  • Type transformation — Convert LLM output to executable code
  • File operations — Write task output to disk before validation
  • API calls — Transform output and submit to external services
  • Resource acquisition — Allocate resources needed for validation

Example: Write to File

#![allow(unused)]
fn main() {
use naaf_core::{Materialiser, Runtime};
use std::path::PathBuf;

struct WriteToFile {
    path: PathBuf,
}

#[async_trait::async_trait]
impl Materialiser for WriteToFile {
    type Input = String;
    type Output = PathBuf;
    type Error = std::io::Error;

    async fn materialise(
        &self,
        _: &Runtime,
        input: &String,
    ) -> Result<PathBuf, std::io::Error> {
        tokio::fs::write(&self.path, input).await?;
        Ok(self.path.clone())
    }
}
}

Usage in a step:

#![allow(unused)]
fn main() {
use naaf_core::Step;

let step = Step::builder(GenerateCode)
    .materialise(WriteToFile { path: PathBuf::from("/tmp/output.rs") })
    .validate(CheckCompiled)
    .build();
}

Example: Transform to Executable Format

#![allow(unused)]
fn main() {
use naaf_core::{Materialiser, Runtime};

struct ParseToJson;

#[async_trait::async_trait]
impl Materialiser for ParseToJson {
    type Input = String;  // Raw LLM output
    type Output = serde_json::Value;  // Parsed JSON
    type Error = serde_json::Error;

    async fn materialise(
        &self,
        _: &Runtime,
        input: &String,
    ) -> Result<serde_json::Value, serde_json::Error> {
        serde_json::from_str(input)
    }
}
}

The materialiser transforms raw string output into structured JSON that the check can validate.

Multiple Materialisers

Chain materialisers when multiple transformations are needed:

#![allow(unused)]
fn main() {
let step = Step::builder(GenerateConfig)
    .materialise(ParseConfig)           // String → Value
    .materialise(ValidateSchema)       // Value → ValidatedValue
    .materialise(WriteToFile)          // ValidatedValue → PathBuf
    .validate(CheckFileExists)
    .build();
}

Each materialiser’s output type must match the next materialiser’s input type.

See Also

  • Task — Produces the input for materialisation
  • Check — Validates the materialised output
  • Step — Combines all stages

RepairPlanner

The RepairPlanner trait produces a new input from failed attempts. It’s the intelligence behind the retry loop.

Definition

#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait RepairPlanner: Send + Sync {
    type Input;
    type Output;
    type Findings;

    async fn repair(
        &self,
        runtime: &Runtime,
        attempts: &[Attempt<Self::Findings, Self::Input>],
    ) -> Result<Self::Output, Self::Error>;
}
}

Parameters

  • Input — The type of input that started the attempt
  • Output — The new input to retry with
  • Findings — The type of findings from failed checks

Accessing Attempt History

The repair method receives all previous attempts, giving full context for repair decisions:

#![allow(unused)]
fn main() {
use naaf_core::{Attempt, RepairPlanner, Runtime};

struct LlmRepairPlanner {
    system_prompt: String,
}

#[async_trait::async_trait]
impl RepairPlanner for LlmRepairPlanner {
    type Input = String;
    type Output = String;
    type Findings = ValidationError;

    async fn repair(
        &self,
        runtime: &Runtime,
        attempts: &[Attempt<ValidationError, String>],
    ) -> Result<String, Self::Error> {
        let last_attempt = attempts.last().ok_or_else(|| {
            std::io::Error::new(std::io::ErrorKind::Other, "No attempts")
        })?;

        let history = attempts.iter().enumerate().map(|(i, a)| {
            format!("Attempt {}:\nInput: {}\nFindings: {:?}",
                i + 1, a.input(), a.findings())
        }).join("\n---\n");

        let prompt = format!(
            "{}\n\nPrevious attempts:\n{}\n\nProduce a new input:",
            self.system_prompt, history
        );

        let llm = runtime.llm().ok_or_else(|| {
            std::io::Error::new(std::io::ErrorKind::NotFound, "LLM not configured")
        })?;

        let response = llm.complete(&prompt).await?;
        Ok(response)
    }
}
}

Attempt Structure

Each Attempt<F, I> contains:

MethodDescription
input()The original input for this attempt
output()The task’s output
subject()The materialised subject (if any)
findings()The findings from checks
accepted()Whether the step passed validation

Repair Strategies

Simple retry with modified input

#![allow(unused)]
fn main() {
struct IncrementRetry;

#[async_trait::async_trait]
impl RepairPlanner for IncrementRetry {
    type Input = u32;
    type Output = u32;
    type Findings = ();

    async fn repair(
        &self,
        _: &Runtime,
        attempts: &[Attempt<(), u32>],
    ) -> Result<u32, Self::Error> {
        Ok(attempts.len() as u32 + 1)
    }
}
}

LLM-based repair

Uses an LLM to analyze failures and generate better input:

#![allow(unused)]
fn main() {
struct SmartRepair {
    model: String,
    system_prompt: String,
}

#[async_trait::async_trait]
impl RepairPlanner for SmartRepair {
    type Input = String;
    type Output = String;
    type Findings = ValidationError;

    async fn repair(
        &self,
        runtime: &Runtime,
        attempts: &[Attempt<ValidationError, String>],
    ) -> Result<String, Self::Error> {
        // Analyze findings and ask LLM for new input
        let llm = runtime.llm().expect("LLM configured");
        let response = llm.complete(&build_repair_prompt(self.system_prompt, attempts)).await?;
        Ok(response)
    }
}
}

Enabling Retry

A step only retries when a RepairPlanner is attached:

#![allow(unused)]
fn main() {
let step = Step::builder(GenerateCode)
    .validate(SyntaxCheck)
    .repair_with(LlmRepairPlanner { system_prompt: "Fix the code".to_string() })
    .retry_policy(RetryPolicy::new(3))  // Allow up to 3 attempts
    .build();
}

Without .repair_with(), the step does not retry — it returns immediately after the first failure.

See Also

  • Step — Combines Task, Check, Materialiser, and RepairPlanner
  • RetryPolicy — Configures retry behavior
  • LLM Integration — Use LLMs for repair planning

Step

A Step owns one task plus its validation pipeline and optional repair loop. The builder pattern enforces type safety at compile time.

Creating a Step

#![allow(unused)]
fn main() {
use naaf_core::{Check, RetryPolicy, Step, Task};

let step = Step::builder(GenerateCode)
    .validate(SyntaxCheck)
    .validate(SecurityCheck)
    .repair_with(RepairCode)
    .retry_policy(RetryPolicy::new(3))
    .build();
}

Builder Methods

MethodDescriptionReturns
Step::builder(task)Start building with a taskStepBuilder
.validate(check)Add a check to the pipeline&mut StepBuilder
.materialise(materialiser)Add a materialiser&mut StepBuilder
.repair_with(planner)Enable retry on failure&mut StepBuilder
.retry_policy(policy)Configure max attempts&mut StepBuilder
.build()Produce a runnable stepStep

Type Safety

The builder enforces that types are compatible:

#![allow(unused)]
fn main() {
// This won't compile - type mismatch:
// Task output (String) doesn't match Check subject (u32)
Step::builder(GenerateString)
    .validate(ValidateNumber)  // Error!
    .build();
}

Retry Policy

#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;

// Default: 1 attempt (no retry)
let policy = RetryPolicy::default();

// Custom: up to N attempts
let policy = RetryPolicy::new(3);

// Exponential backoff
let policy = RetryPolicy::new(3).with_backoff(
    tokio::time::Duration::from_secs(1),
    tokio::time::Duration::from_secs(10),
);
}

Running a Step

Basic run

#![allow(unused)]
fn main() {
let output = step.run(&runtime, &input).await?;
}

Run with tracing

#![allow(unused)]
fn main() {
use naaf_core::TracedOutput;

let traced = step.run_traced(&runtime, &input).await?;
println!("Output: {:?}", traced.output());
println!("Attempts: {}", traced.report().attempt_count());

for attempt in traced.report().attempts() {
    println!("  accepted={}, findings={:?}", attempt.accepted(), attempt.findings());
}
}

Error Handling

Steps produce a StepError<F, E>:

#![allow(unused)]
fn main() {
use naaf_core::StepError;

match result {
    Ok(output) => println!("Success: {:?}", output),
    Err(StepError::System { stage, error }) => {
        eprintln!("Infrastructure failure at {}: {:?}", stage, error);
    }
    Err(StepError::Rejected(report)) => {
        eprintln!("Exhausted retries after {} attempts", report.attempt_count());
        for attempt in report.attempts() {
            eprintln!("  - findings: {:?}", attempt.findings());
        }
    }
}
}

Step Outputs

A Step can be used as a Task, Check, or Materialiser in other contexts:

#![allow(unused)]
fn main() {
use naaf_core::{Check, Task};

// Using a step as a Task in another step
let composed = Step::builder(step)  // step implements Task
    .validate(FinalCheck)
    .build();
}

Finding Types

Steps track findings throughout execution. Access them via the report:

#![allow(unused)]
fn main() {
let traced = step.run_traced(&runtime, &input).await?;
let report = traced.report();

println!("Total attempts: {}", report.attempt_count());
println!("Final state: {:?}", report.final_state());
}

See Also

Building Workflows

Workflows compose individual steps into larger patterns. naaf provides three composition models:

  1. Sequential — Output from one step feeds into the next
  2. Parallel — Steps run concurrently, then reconcile
  3. Dynamic — Runtime-determined topology with Workflow

Sequential Workflows

The simplest pattern: chain steps where output flows to input.

#![allow(unused)]
fn main() {
use naaf_core::Step;

// Sequential: A → B → C
let workflow = step_a
    .then(step_b)
    .then(step_c);
}

The output type of each step must match the input type of the next.

Parallel Workflows

Run multiple steps concurrently:

#![allow(unused)]
fn main() {
// Fan-out: same input, both run
let both = step_a.join(step_b);

// Fan-out: different inputs
let pair = step_a.zip(step_b);

// Fan-in: reconcile parallel outputs
let merged = both.reconcile_task(MergeResults);
}

Dynamic Workflows

For runtime-determined topologies, use Workflow with nodes:

#![allow(unused)]
fn main() {
use naaf_core::{StepNode, Workflow, NodeSpec, EdgeSpec, GraphPatch};

// Create typed step nodes
let plan_node = StepNode::new(
    Step::builder(PlanProject).with_findings::<()>().build(),
    |input: &NodeInput| input.seed_as::<ProjectBrief>(),
);

// Nodes can spawn downstream work
let root = StepNode::new(
    Step::builder(root_step).with_findings::<()>().build(),
    |input: &NodeInput| input.seed_as::<Input>(),
).spawn_with(|_context, output| {
    GraphPatch::new()
        .with_node(node_a)
        .with_node(node_b)
        .with_edge(EdgeSpec::new(root_id, node_a_id))
});

let workflow = Workflow::new()
    .with_patch(GraphPatch::new().with_node(root_spec))?;

let report = workflow.run(&runtime).await?;
}

Guide to Composition

Sequential Composition

Sequential composition chains steps where output flows from one to the next.

The .then() Combinator

#![allow(unused)]
fn main() {
let workflow = step_a.then(step_b);
}

The output type of step_a must match the input type of step_b.

Example

#![allow(unused)]
fn main() {
use naaf_core::Step;

// A: String → Code
let generate = Step::builder(GenerateCode)
    .validate(SyntaxCheck)
    .build();

// B: Code → TestResults
let test = Step::builder(RunTests)
    .validate(TestResultsCheck)
    .build();

// C: TestResults → FinalOutput
let format = Step::builder(FormatOutput)
    .validate(FinalCheck)
    .build();

// Chain: String → Code → TestResults → FinalOutput
let pipeline = generate.then(test).then(format);

let output = pipeline.run(&runtime, &"build a counter".to_string()).await?;
}

Type Conversion

Use Materialiser for type conversion:

#![allow(unused)]
fn main() {
use naaf_core::Materialiser;

// A returns Code, B expects PathBuf
let generate_and_save = Step::builder(GenerateCode)
    .materialise(WriteToFile)
    .validate(FileCheck)
    .build();

// B expects PathBuf, A returns String - types match now
let pipeline = generate_and_save.then(run_tests);
}

Short-circuit Evaluation

The pipeline stops on first error:

#![allow(unused)]
fn main() {
let result = pipeline.run(&runtime, &input).await;

match result {
    Ok(output) => println!("Success: {:?}", output),
    Err(e) => {
        // Execution stopped at the failing step
        eprintln!("Pipeline failed: {:?}", e);
    }
}
}

With Retry

Each step in the pipeline can have its own retry policy:

#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;

let pipeline = Step::builder(GenerateCode)  // No retry
    .validate(SyntaxCheck)
    .build()
    .then(
        Step::builder(RunTests)  // With retry
            .validate(TestCheck)
            .repair_with(TestRepairer)
            .retry_policy(RetryPolicy::new(3))
            .build()
    );
}

See Also

Parallel Composition

Parallel composition runs multiple steps concurrently and reconciles their results.

Combinators

CombinatorDescription
.join(other)Same input, both run, output is a tuple
.zip(other)Different inputs, both run concurrently
.reconcile_task(task)Fan-in: reconcile parallel outputs

.join() — Same Input

Both steps receive the same input and run concurrently:

#![allow(unused)]
fn main() {
let both = step_a.join(step_b);

// Input: String
// Output: (OutputA, OutputB)
let result = both.run(&runtime, &input).await?;
}

.zip() — Different Inputs

Steps receive different inputs:

#![allow(unused)]
fn main() {
use naaf_core::Step;

let generate_code = Step::builder(GenerateCode).build();
let generate_tests = Step::builder(GenerateTests).build();

let zipped = generate_code.zip(generate_tests);

// Input: (CodeRequest, TestRequest)
// Output: (Code, Tests)
let result = zipped.run(&runtime, &(
    "build a counter".to_string(),
    "test the counter".to_string(),
)).await?;
}

.reconcile_task() — Fan-in

Reconcile parallel outputs with a task:

#![allow(unused)]
fn main() {
use naaf_core::Step;

let branch_a = Step::builder(GenerateCode).build();
let branch_b = Step::builder(GenerateTests).build();

let merged = branch_a
    .join(branch_b)
    .reconcile_task(MergeCodeAndTests);

// Input: Request
// Output: MergedOutput
let result = merged.run(&runtime, &request).await?;
}

Example: Fan-out/Fan-in

#![allow(unused)]
fn main() {
use naaf_core::Step;

// Fan-out: same request to three generators
let search = Step::builder(SearchDocs).build();
let code = Step::builder(GenerateCode).build();
let test = Step::builder(GenerateTests).build();

let branches = search.join(code).join(test);

// Fan-in: reconcile the three results
let pipeline = branches.reconcile_task(CombineResults);

// Run concurrently
let output = pipeline.run(&runtime, &query).await?;
}

Concurrency

Steps in .join() and .zip() run concurrently via the runtime’s executor:

#![allow(unused)]
fn main() {
use naaf_core::Runtime;

let runtime = Runtime::new();  // Uses tokio runtime

// These run in parallel:
let result = combined.run(&runtime, &input).await;
}

Error Handling

If any branch fails, the whole pipeline fails:

#![allow(unused)]
fn main() {
match result {
    Ok((a, b)) => println!("Both succeeded: {:?}, {:?}", a, b),
    Err(e) => {
        // One or both branches failed
        eprintln!("Parallel execution failed: {:?}", e);
    }
}
}

Partial results are not preserved — all branches must succeed.

With Retry

Each branch can have its own retry policy:

#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;

let robust = Step::builder(SearchDocs)
    .validate(DocCheck)
    .repair_with(DocRepairer)
    .retry_policy(RetryPolicy::new(3))
    .build()
    .join(
        Step::builder(GenerateCode)
            .validate(CodeCheck)
            .repair_with(CodeRepairer)
            .retry_policy(RetryPolicy::new(5))
            .build()
    );
}

See Also

Dynamic Workflows

Dynamic workflows construct their topology at runtime using Workflow, StepNode, and GraphPatch.

Core Types

TypePurpose
StepNodeA node in the workflow graph
NodeSpecDefines a step’s requirements
EdgeSpecDefines connections between nodes
GraphPatchAdds nodes/edges at runtime
WorkflowExecutes the directed acyclic graph

Creating Nodes

#![allow(unused)]
fn main() {
use naaf_core::{StepNode, NodeSpec, NodeInput};

let node = StepNode::new(
    Step::builder(MyTask)
        .validate(MyCheck)
        .build(),
    |input: &NodeInput| input.seed_as::<MyInput>(),
);
}

Each node has:

  • A step to execute
  • A closure that extracts/seeds input from node input

Spawning Dynamic Nodes

Nodes can spawn new nodes at runtime:

#![allow(unused)]
fn main() {
let root = StepNode::new(
    Step::builder(RootStep).build(),
    |input: &NodeInput| input.seed_as::<RootInput>(),
).spawn_with(|context, output| {
    // Analyze output and decide what nodes to spawn
    match output.needs_subtasks() {
        true => GraphPatch::new()
            .with_node(child_node_a)
            .with_node(child_node_b)
            .with_edge(EdgeSpec::new(context.node_id, child_node_a_id)),
        false => GraphPatch::new(),
    }
});
}

Building the Workflow

#![allow(unused)]
fn main() {
use naaf_core::{Workflow, GraphPatch};

let workflow = Workflow::new()
    .with_patch(GraphPatch::new()
        .with_node(root_spec)
        .with_node(node_a)
        .with_node(node_b)
        .with_edge(EdgeSpec::new(root_id, node_a_id))
    )?;

let report = workflow.run(&runtime).await?;
}

Scheduling

The runtime schedules nodes when their dependencies complete:

  • Nodes with no dependencies run immediately
  • Nodes with satisfied dependencies run in parallel
  • Downstream patches are applied when parent nodes complete

Node Input

Each node receives a NodeInput:

#![allow(unused)]
fn main() {
fn extract_input(input: &NodeInput) -> MyType {
    // Get from seed
    input.seed_as::<MyType>()
    
    // Or from previous node output
    input.get_output::<PreviousOutput>(node_id)
}
}

Example: Dynamic Task Generation

#![allow(unused)]
fn main() {
use naaf_core::{StepNode, Workflow, GraphPatch, NodeInput, EdgeSpec};

let plan_node = StepNode::new(
    Step::builder(PlanTasks).build(),
    |input: &NodeInput| input.seed_as::<ProjectBrief>(),
).spawn_with(|context, plan| {
    let mut patch = GraphPatch::new();

    for (i, task) in plan.tasks.iter().enumerate() {
        let task_node = StepNode::new(
            Step::builder(ExecuteTask)
                .validate(TaskCheck)
                .build(),
            |input: &NodeInput| input.seed_as::<TaskInput>(),
        );
        patch = patch
            .with_node(task_node)
            .with_edge(EdgeSpec::new(context.node_id, task_node_id));
    }

    patch
};

let workflow = Workflow::new()
    .with_patch(GraphPatch::new().with_node(plan_node))?;

let report = workflow.run(&runtime).await?;
}

Error Handling

The workflow report contains results from all nodes:

#![allow(unused)]
fn main() {
let report = workflow.run(&runtime).await?;

for (node_id, result) in report.node_results() {
    match result {
        Ok(output) => println!("Node {}: {:?}", node_id, output),
        Err(e) => eprintln!("Node {} failed: {:?}", node_id, e),
    }
}
}

See Also

Observability

naaf provides structured tracing for debugging and monitoring workflows. All core traits have extension methods for observability.

TaskExt

Wrap any task with observability:

#![allow(unused)]
fn main() {
use naaf_core::TaskExt;

let observed = my_task.observed();           // Auto-named
let observed = my_task.observed_as("name");  // Custom name
}

CheckExt

Wrap checks:

#![allow(unused)]
fn main() {
use naaf_core::CheckExt;

let observed_check = my_check.observed();
let observed_check = my_check.observed_as("validate_output");
}

Running with Tracing

Use run_traced() to get detailed execution reports:

#![allow(unused)]
fn main() {
use naaf_core::TracedOutput;

let traced = step.run_traced(&runtime, &input).await?;

println!("Output: {:?}", traced.output());
println!("Attempts: {}", traced.report().attempt_count());

for attempt in traced.report().attempts() {
    println!("  accepted={}, findings={:?}", 
        attempt.accepted(), 
        attempt.findings()
    );
}
}

AttemptReport

Each attempt contains:

MethodDescription
input()Original input
output()Task output
subject()Materialised subject
findings()Validation findings
accepted()Whether check passed

TracedOutput

#![allow(unused)]
fn main() {
struct TracedOutput<F, O> {
    output: O,
    report: StepReport<F>,
}
}

Methods:

  • output() — Get the final output
  • report() — Get the execution report

StepReport

#![allow(unused)]
fn main() {
struct StepReport<F> {
    attempts: Vec<AttemptReport<F>>,
}
}

Methods:

  • attempt_count() — Total attempts made
  • attempts() — Iterator over attempts
  • final_state() — Final acceptance state
  • findings() — All findings from all attempts

Structured Logging

naaf uses the tracing crate:

#![allow(unused)]
fn main() {
use tracing::{info, error, warn};

info!(step = "generate", "running step");
warn!(findings = ?findings, "validation failed, retrying");
error!(attempt = 3, "retry exhausted");
}

Configuration

Enable tracing in your runtime:

#![allow(unused)]
fn main() {
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;

tracing_subscriber::registry()
    .with(fmt::layer())
    .init();
}

See Also

  • Examples — See observability in action

Integrations

naaf integrates with multiple backends for LLM calls, shell commands, and vector databases.

Available Integrations

IntegrationCrateDescription
LLMnaaf-llmOpenAI, Anthropic, custom providers
Processnaaf-processShell command execution
Vector DBnaaf-qdrantQdrant vector database
Knowledgenaaf-knowledgeKnowledge base orchestration

Guide to Integrations

LLM Integration

The naaf-llm crate provides LLM-backed adapters, tool calling, and multiple provider support.

Setup

[dependencies]
naaf-llm = "0.1.0"

With OpenAI support:

naaf-llm = { version = "0.1.0", features = ["openai"] }

LlmClient

#![allow(unused)]
fn main() {
use naaf_llm::LlmClient;

let client = LlmClient::builder()
    .api_key(std::env::var("OPENAI_API_KEY")?)
    .model("gpt-4")
    .build();
}

LlmAgent

The LlmAgent provides a shared executor that can be projected into any core trait:

#![allow(unused)]
fn main() {
use naaf_llm::{LlmAgent, Message, CompletionRequest, ExecutionOutcome};

let agent = LlmAgent::new(client);

// Project into Task
let task = agent.task(
    |_, input: String| Ok(CompletionRequest::new("gpt-4", vec![Message::user(input)])),
    |outcome: ExecutionOutcome| Ok(outcome.final_message().content.clone().unwrap_or_default()),
);

// Project into Check
let check = agent.check(
    |_, subject: String| Ok(CompletionRequest::new("gpt-4", vec![Message::user(subject)])),
    |outcome: ExecutionOutcome| serde_json::from_str(&outcome.final_message().content.as_deref().unwrap_or("[]")),
);

// Project into RepairPlanner
let repair = agent.repair(
    |_, attempts: Vec<Attempt<Findings, Input>| { ... },
    |outcome: ExecutionOutcome| Ok(outcome.final_message().content.clone().unwrap_or_default()),
);
}

Tool Calling

Define tools for the LLM to use:

#![allow(unused)]
fn main() {
use naaf_llm::{Tool, ToolRegistry, ToolSpec};

struct SearchTool;

impl Tool for SearchTool {
    type Input = String;
    type Output = Vec<SearchResult>;

    fn spec(&self) -> ToolSpec {
        ToolSpec::new("search", "Search the knowledge base")
            .add_parameter("query", "string", "The search query")
    }

    async fn execute(&self, runtime: &Runtime, input: &String) -> Result<Vec<SearchResult>, Self::Error> {
        // Execute the tool
        Ok(search(&runtime, input).await?)
    }
}

let registry = ToolRegistry::new()
    .with_tool(SearchTool);
}

Dynamic Spawning

Spawn new nodes from tool calls:

#![allow(unused)]
fn main() {
use naaf_llm::{SpawnTool, SpawnResult, resolve_spawn};

let spawn_tool = SpawnTool::new(registry);

let outcome = llm.execute_with_tools(&request, &[spawn_tool]).await?;

if let Some(SpawnResult { spawn, .. }) = outcome.tool_calls().first() {
    let patch = resolve_spawn(spawn, node_context).await?;
    workflow.apply_patch(patch)?;
}
}

Built-in Adapters

AdapterDescription
LlmTaskProject LLM as a Task
LlmCheckProject LLM as a Check
LlmMaterialiserProject LLM as a Materialiser
LlmRepairPlannerProject LLM as a RepairPlanner

Providers

OpenAI

#![allow(unused)]
fn main() {
use naaf_llm::OpenAIClient;

let client = OpenAIClient::new(api_key, "gpt-4");
}

Anthropic

#![allow(unused)]
fn main() {
use naaf_llm::AnthropicClient;

let client = AnthropicClient::new(api_key, "claude-3-opus-20240229");
}

Custom Provider

Implement the LlmProvider trait for other providers:

#![allow(unused)]
fn main() {
impl LlmProvider for MyProvider {
    async fn complete(&self, request: &CompletionRequest) -> Result<CompletionResponse, Self::Error>;
    async fn complete_with_tools(&self, request: &CompletionRequest, tools: &[Box<dyn Tool>]) -> Result<CompletionResponse, Self::Error>;
}
}

Process Integration

The naaf-process crate wraps shell commands into core traits.

Setup

[dependencies]
naaf-process = "0.1.0"

ProcessAgent

#![allow(unused)]
fn main() {
use naaf_process::ProcessAgent;

let agent = ProcessAgent::new();
}

Converting Commands to Traits

ProcessTask

#![allow(unused)]
fn main() {
use naaf_process::{ProcessCommand, ProcessOutput};

let task = agent.task(
    |_, script: String| Ok(ProcessCommand::shell(script)),
    |output: ProcessOutput| String::from_utf8(output.stdout),
);
}

ProcessCheck

#![allow(unused)]
fn main() {
use naaf_process::ProcessOutput;

let check = agent.check(
    |_, subject: String| Ok(ProcessCommand::shell(format!("echo '{}' | sh", subject))),
    |output: ProcessOutput| {
        let result = String::from_utf8_lossy(&output.stdout);
        if result.contains("error") {
            vec![CheckError { message: result.to_string() }]
        } else {
            vec![]
        }
    },
);
}

ProcessMaterialiser

#![allow(unused)]
fn main() {
use naaf_process::{ProcessOutput, FilePath};

let materialiser = agent.materialiser(
    |_, content: String| {
        let path = "/tmp/script.sh";
        std::fs::write(path, &content)?;
        Ok(ProcessCommand::shell(format!("chmod +x {}", path)))
    },
    |output: ProcessOutput| Ok(FilePath::from("/tmp/script.sh")),
);
}

ProcessRepairPlanner

#![allow(unused)]
fn main() {
use naaf_core::Attempt;

let repair = agent.repair(
    |_, attempts: &[Attempt<Error, String>]| {
        let last = attempts.last().unwrap();
        let suggestion = format!("# Suggestion: {}\n{}", last.findings().first().map(|f| f.message.clone()).unwrap_or_default());
        Ok(ProcessCommand::shell(suggestion))
    },
    |output: ProcessOutput| String::from_utf8(output.stdout),
);
}

ProcessCommand

#![allow(unused)]
fn main() {
use naaf_process::ProcessCommand;

// Shell command
ProcessCommand::shell("ls -la")

// Direct program with args
ProcessCommand::new("cargo", &["build", "--release"])

// With working directory
ProcessCommand::shell("cargo test")
    .cwd("/path/to/project")
}

ProcessOutput

#![allow(unused)]
fn main() {
struct ProcessOutput {
    stdout: Vec<u8>,
    stderr: Vec<u8>,
    status: ExitStatus,
}
}

Working Directory

#![allow(unused)]
fn main() {
let task = agent.task(
    |_, script: String| {
        Ok(ProcessCommand::shell(script)
            .cwd("/path/to/project"))
    },
    |output: ProcessOutput| { ... },
);
}

Environment Variables

#![allow(unused)]
fn main() {
let task = agent.task(
    |_, script: String| {
        Ok(ProcessCommand::shell(script)
            .env("RUST_LOG", "debug"))
    },
    |output: ProcessOutput| { ... },
);
}

Example: Cargo Workflow

#![allow(unused)]
fn main() {
use naaf_core::Step;

let generate = Step::builder(GenerateCode).build();

let cargo_check = agent.task(
    |_, code: String| Ok(ProcessCommand::shell(format!("echo '{}' | cargo check 2>&1", code))),
    |output: ProcessOutput| {
        let err = String::from_utf8_lossy(&output.stderr);
        if err.contains("error") || err.contains("warning") {
            Err(StepError::Rejected(vec![Findings { message: err.to_string() }]))
        } else {
            Ok(code)
        }
    },
);

let workflow = generate.then(cargo_check);
}

See Also

Knowledge Base

The naaf-knowledge crate provides knowledge orchestration with Qdrant vector database.

Setup

[dependencies]
naaf-knowledge = "0.1.0"
naaf-qdrant = "0.1.0"

Architecture

User Input → KnowledgeIngest → Qdrant
              ↓
Query Request → KnowledgeQuery → Qdrant → LLM Context
              ↓
Lint Request → KnowledgeLint → Qdrant → LLM Feedback

QdrantClient

#![allow(unused)]
fn main() {
use naaf_qdrant::QdrantClient;

let client = QdrantClient::new(
    "http://localhost:6333",
    Some("api-key"),
);
}

Knowledge Operations

Ingest

#![allow(unused)]
fn main() {
use naaf_knowledge::{IngestRequest, IngestResponse};

let request = IngestRequest::new()
    .with_documents(docs)
    .with_chunking_config(ChunkingConfig::default()
        .chunk_size(1024)
        .overlap(128))
    .with_embeddings(embeddings_config);

let response = knowledge.ingest(&runtime, request).await?;
}

Query

#![allow(unused)]
fn main() {
use naaf_knowledge::{QueryRequest, QueryResponse};

let request = QueryRequest::new()
    .with_query("how do I build a step?")
    .with_limit(5)
    .with_threshold(0.7);

let response = knowledge.query(&runtime, request).await?;

for result in response.results() {
    println!("Score: {:.2}, Content: {}", result.score(), result.content());
}
}

Lint

#![allow(unused)]
fn main() {
use naaf_knowledge::{LintRequest, LintResponse};

let request = LintRequest::new()
    .with_code(code)
    .with_context(context_docs);

let response = knowledge.lint(&runtime, request).await?;
}

Chunking Strategies

Fixed Size

#![allow(unused)]
fn main() {
use naaf_qdrant::ChunkingConfig;

ChunkingConfig::default()
    .chunk_size(1024)
    .overlap(128)
}

Semantic

Split by paragraphs, sections, or code blocks:

#![allow(unused)]
fn main() {
ChunkingConfig::semantic()
    .separators(&["\n\n", "\n", " "])
    .min_chunk_size(100)
    .max_chunk_size(2048)
}

Embeddings

#![allow(unused)]
fn main() {
use naaf_qdrant::{EmbeddingAdapter, EmbeddingRequest};

let adapter = naaf_qdrant::OpenAIEmbeddings::new(client.clone(), "text-embedding-ada-002");

let request = EmbeddingRequest::new()
    .with_texts(texts);

let embeddings = adapter.embed(&runtime, request).await?;
}

Collections

#![allow(unused)]
fn main() {
use naaf_qdrant::{client, CreateCollection};

client.create_collection("my-collection").await?;

let collection = client.collection("my-collection");
}

CLI

The naaf-cli crate provides a CLI for knowledge base operations:

# Ingest documents
naaf kb ingest --path /path/to/docs

# Query the knowledge base
naaf kb query "how do I build a workflow?"

# Start the API server
naaf kb serve --port 8080

Example: Full Workflow

#![allow(unused)]
fn main() {
use naaf_knowledge::{KnowledgeOrchestrator, IngestRequest, QueryRequest};
use naaf_core::Step;

let orchestrator = KnowledgeOrchestrator::new(
    qdrant_client,
    llm_client,
);

// Ingest documentation
tokio::spawn(async move {
    orchestrator.ingest(&runtime, IngestRequest::new()
        .with_documents(load_docs("docs/").await?)
    ).await.unwrap();
});

// Query with context
let result = orchestrator.query(&runtime, QueryRequest::new()
    .with_query(user_query)
    .with_limit(3)
).await?;
}

See Also

Examples

Each example is a standalone binary in the examples/ directory. Run them with cargo run -p <example-name>.

Running Examples

# List all examples
ls examples/

# Run a specific example
cargo run -p step-retry

# Run with output visible
cargo run -p step-retry -- --nocapture

Example Index

ExampleDescription
step-retryTask-check-repair loop with a planning task that validates and retries
materialiserMaterialising task output into a different type before validation
join-reconcileParallel fan-out with .join() and fan-in with .reconcile_task()
composed-workflowSequential retry + parallel composition
dynamic-workflowRuntime graph construction with Workflow, StepNode, and GraphPatch
process-taskShell-command integration via naaf-process
build-testGenerate → materialise → validate → repair loop at the heart of naaf
knowledge-toolKnowledge base integration
tui-demoTerminal UI demonstration

Example Descriptions

step-retry

The core example: a task that generates output, validates it with a check, and retries with a repair planner on failure.

materialiser

Shows how to transform output between task and check. A task produces raw text, a materialiser writes it to a file, and the check validates the file.

join-reconcile

Parallel execution: use .join() to run two steps with the same input concurrently, then .reconcile_task() to merge their results.

composed-workflow

Combines sequential and parallel composition: step A → (step B + step C) → step D.

dynamic-workflow

Runtime-determined topology: nodes can spawn new nodes based on their output using spawn_with() and GraphPatch.

process-task

Shell command integration: wraps cargo check, cargo test, and other CLI tools as tasks and checks.

build-test

The quintessential workflow: generate code → write to file → compile → test → fix if needed.

knowledge-tool

Qdrant integration: ingest documentation and query it with semantic search.

tui-demo

Interactive terminal UI for observing workflow execution and providing human feedback.

Adding New Examples

  1. Create a new directory in examples/
  2. Add a Cargo.toml with the crate name
  3. Implement your example
  4. Add it to Cargo.toml workspace members
# examples/my-example/Cargo.toml
[package]
name = "my-example"
version = "0.1.0"
edition = "2024"

[dependencies]
naaf-core = { path = "../crates/core" }
tokio = { version = "1", features = ["full"] }