naaf - Not Another Agent Framework
naaf is a strongly typed Rust library for building asynchronous LLM workflows out of retrying steps. It provides composable, type-safe building blocks for workflows that plan, validate, repair, and rerun.
Core Philosophy
Every workflow follows the core loop: run a task, validate the output, and if validation fails, repair the input and try again. Steps compose sequentially, in parallel, or as dynamic graphs — all with full tracing support.
Status
naaf is early-stage. APIs may change between releases.
Features
- Type-safe workflows — Your Rust types flow through the entire pipeline. No stringly-typed dictionaries.
- Composable steps — Sequential, parallel, and dynamic graph composition.
- Built-in retry — Task-check-repair loop with configurable retry policies.
- Observability — Full tracing support for debugging and monitoring.
- Multiple backends — LLM integration (OpenAI, Anthropic), shell commands, and vector databases.
Crates
| Crate | Description |
|---|---|
naaf-core | Core traits, Step builder, Workflow runtime |
naaf-llm | LLM-backed adapters, tool calling |
naaf-process | Shell-command adapters |
naaf-qdrant | Qdrant vector database integration |
naaf-knowledge | Knowledge orchestration |
naaf-tui | Terminal UI for workflow observation |
naaf-persistence-fs | Filesystem checkpoint persistence |
naaf-persistence-sqlite | SQLite checkpoint persistence |
naaf-cli | CLI for knowledge base operations |
Quick Example
#![allow(unused)]
fn main() {
use naaf_core::{Check, RetryPolicy, Step, Task};
// Define your domain types
struct GeneratePatch;
struct CargoTest;
struct RepairPatch;
// Build a step with validation and automatic repair
let step = Step::builder(GeneratePatch)
.validate(CargoTest)
.repair_with(RepairPatch)
.retry_policy(RetryPolicy::new(3))
.build();
// Run the step
let result = step.run(&runtime, prompt).await?;
}
Next Steps
- Getting Started — Install and run your first workflow
- Core Concepts — Understand the building blocks
- Examples — Run standalone examples
Getting Started
This guide walks you through installing naaf and running your first workflow.
Installation
Add the crates you need to your Cargo.toml:
[dependencies]
naaf-core = "0.1.0" # Core traits and Step builder
naaf-llm = "0.1.0" # LLM-backed Task, Check, Materialiser, RepairPlanner
naaf-process = "0.1.0" # Shell-command-backed adapters
The naaf-llm crate has an optional openai feature for the built-in OpenAI client:
naaf-llm = { version = "0.1.0", features = ["openai"] }
Your First Workflow
This example creates a step that generates a code patch, validates it with cargo test, and retries on failure.
Dependencies
[dependencies]
naaf-core = "0.1.0"
naaf-process = "0.1.0"
tokio = { version = "1", features = ["full"] }
Code
use naaf_core::{Check, RetryPolicy, Step, Task};
use naaf_process::ProcessAgent;
use serde::{Deserialize, Serialize};
use std::process::Stdio;
use tokio::process::Command;
#[derive(Debug, Clone, Serialize, Deserialize)]
struct Patch(String);
#[derive(Debug, Clone, Serialize, Deserialize)]
struct PatchFindings {
failed: Vec<String>,
}
struct GeneratePatch;
#[async_trait::async_trait]
impl Task for GeneratePatch {
type Input = String;
type Output = Patch;
type Error = std::io::Error;
async fn run(&self, _: &naaf_core::Runtime, input: &String) -> Result<Patch, Self::Error> {
Ok(Patch(format!("// Generated patch for: {}\nfn solution() {{ }}\n", input)))
}
}
struct ValidatePatch;
#[async_trait::async_trait]
impl Check for ValidatePatch {
type Subject = Patch;
type Findings = PatchFindings;
type Error = std::io::Error;
async fn check(
&self,
_: &naaf_core::Runtime,
subject: &Patch,
) -> Result<Vec<Self::Findings>, Self::Error> {
let output = Command::new("sh")
.arg("-c")
.arg(format!("echo '{}' | cargo check --message-format=json 2>&1 || true", subject.0))
.stdout(Stdio::piped())
.output()
.await?;
let output_str = String::from_utf8_lossy(&output.stdout);
if output_str.contains("error") {
Ok(vec![PatchFindings {
failed: vec![output_str.to_string()],
}])
} else {
Ok(vec![])
}
}
}
struct RepairPatch;
#[async_trait::async_trait]
impl naaf_core::RepairPlanner for RepairPatch {
type Input = String;
type Output = String;
type Findings = PatchFindings;
async fn repair(
&self,
_: &naaf_core::Runtime,
attempts: &[naaf_core::Attempt<PatchFindings, Patch>],
) -> Result<String, Self::Error> {
let last_input = attempts.last().map(|a| a.input()).unwrap_or("");
let findings = attempts.last().map(|a| a.findings()).unwrap_or(&vec![]);
Ok(format!("retry with: {} failures: {:?}", last_input, findings))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = naaf_core::Runtime::new();
let step = Step::builder(GeneratePatch)
.validate(ValidatePatch)
.repair_with(RepairPatch)
.retry_policy(RetryPolicy::new(3))
.build();
let result = step.run(&runtime, &"test input".to_string()).await;
match result {
Ok(output) => println!("Success: {:?}", output),
Err(e) => println!("Failed: {:?}", e),
}
Ok(())
}
Running Examples
Each example is a standalone binary in the examples/ directory:
# Run the step-retry example
cargo run -p step-retry
# Run the materialiser example
cargo run -p materialiser
# Run the dynamic-workflow example
cargo run -p dynamic-workflow
Next Steps
- Core Concepts — Understand the building blocks in detail
- Building Workflows — Compose steps into larger workflows
- Integrations — Connect to LLMs, shell commands, and databases
Core Concepts
naaf is built around four core traits, all driven by a shared Runtime. These traits form the building blocks of every workflow.
The Four Traits
| Trait | Purpose | Key Method |
|---|---|---|
Task | Produces an artefact from input | run(&self, runtime, input) |
Check | Validates a subject, returns findings | check(&self, runtime, subject) |
Materialiser | Transforms output (often with side effects) | materialise(&self, runtime, input) |
RepairPlanner | Produces the next input from failed attempts | repair(&self, runtime, attempts) |
Each trait is async, takes a runtime reference for capabilities, and returns a strongly typed result. No stringly-typed dictionaries — your Rust types flow through the entire pipeline.
Relationship Between Traits
Input → Task → Output → Materialiser → Subject → Check → Findings
↑ ↓
←←←←←← RepairPlanner ←←←←←←←←←←←←
The core loop works as follows:
- A
Taskproduces anOutput - A
Materialiseroptionally transforms the output into aSubject - A
Checkvalidates the subject and returnsFindings - If findings are non-empty, a
RepairPlannercan produce a new input for retry
Runtime
The Runtime provides shared capabilities to all traits:
#![allow(unused)]
fn main() {
use naaf_core::Runtime;
let runtime = Runtime::new();
}
You can extend the runtime with capabilities:
#![allow(unused)]
fn main() {
use naaf_llm::LlmClient;
use naaf_qdrant::QdrantClient;
// Extend with LLM and vector database capabilities
let runtime = Runtime::new()
.with_llm(LlmClient::new())
.with_vector_db(QdrantClient::new(url, api_key));
}
Guide to This Section
- Task — Produces output from input
- Check — Validates output, returns findings
- Materialiser — Transforms output between stages
- RepairPlanner — Plans retry input from failures
- Step — Combines Task, Check, Materialiser, and RepairPlanner
Task
The Task trait produces an artefact from input. It’s the starting point of every workflow.
Definition
#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Task: Send + Sync {
type Input;
type Output;
type Error;
async fn run(&self, runtime: &Runtime, input: &Self::Input) -> Result<Self::Output, Self::Error>;
}
}
Parameters
Input— The type of data this task consumesOutput— The type of data this task producesError— The error type this task may return
Example
use naaf_core::{Runtime, Task};
struct GenerateCode;
#[async_trait::async_trait]
impl Task for GenerateCode {
type Input = String;
type Output = String;
type Error = std::io::Error;
async fn run(&self, _: &Runtime, input: &String) -> Result<String, std::io::Error> {
Ok(format!("fn solution() {{ // {}\n unimplemented!()\n}}", input))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = Runtime::new();
let task = GenerateCode;
let output = task.run(&runtime, &"add two numbers".to_string()).await?;
println!("{}", output);
Ok(())
}
Using Runtime Capabilities
The runtime provides access to shared capabilities:
#![allow(unused)]
fn main() {
use naaf_core::{Runtime, Task};
use naaf_llm::LlmClient;
struct LlmTask {
prompt: String,
}
#[async_trait::async_trait]
impl Task for LlmTask {
type Input = String;
type Output = String;
type Error = std::io::Error;
async fn run(&self, runtime: &Runtime, input: &String) -> Result<String, std::io::Error> {
let llm = runtime.llm().ok_or_else(|| std::io::Error::new(
std::io::ErrorKind::NotFound,
"LLM not configured"
))?;
let full_prompt = format!("{}: {}", self.prompt, input);
let response = llm.complete(&full_prompt).await?;
Ok(response)
}
}
}
Task Combinators
Tasks can be combined using the TaskExt extension trait:
#![allow(unused)]
fn main() {
use naaf_core::TaskExt;
// Wrap a task with observability
let observed = my_task.observed();
// Wrap with a custom name
let named = my_task.observed_as("generate_code");
}
See Also
- Check — Validates task output
- Materialiser — Transforms task output
- Step — Combines Task with Check and RepairPlanner
Check
The Check trait validates a subject and returns findings. An empty findings list means the check passed.
Definition
#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Check: Send + Sync {
type Subject;
type Findings;
type Error;
async fn check(
&self,
runtime: &Runtime,
subject: &Self::Subject,
) -> Result<Vec<Self::Findings>, Self::Error>;
}
}
Parameters
Subject— The type of data this check validatesFindings— The type of findings this check may returnError— The error type this check may return
Key Concept: Empty Findings
An empty findings list (vec![]) means the subject passed validation. Any non-empty list indicates failure.
Example
#![allow(unused)]
fn main() {
use naaf_core::{Check, Runtime};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ValidationError {
message: String,
line: Option<u32>,
}
struct SyntaxCheck;
#[async_trait::async_trait]
impl Check for SyntaxCheck {
type Subject = String;
type Findings = ValidationError;
type Error = std::io::Error;
async fn check(
&self,
_: &Runtime,
subject: &String,
) -> Result<Vec<ValidationError>, std::io::Error> {
let mut findings = vec![];
for (i, line) in subject.lines().enumerate() {
if line.contains("unimplemented!()") && !line.starts_with("//") {
findings.push(ValidationError {
message: "Unimplemented code found".to_string(),
line: Some(i as u32 + 1),
});
}
}
Ok(findings)
}
}
}
Chaining Checks
Multiple checks form a validation pipeline:
#![allow(unused)]
fn main() {
let step = Step::builder(GenerateCode)
.validate(SyntaxCheck)
.validate(SecurityCheck)
.validate(PerformanceCheck)
.build();
}
Checks run in the order they are added. The step only passes if all checks return empty findings.
Using Findings in Repair
Findings feed directly into the repair process:
#![allow(unused)]
fn main() {
use naaf_core::RepairPlanner;
struct RepairCode;
#[async_trait::async_trait]
impl RepairPlanner for RepairCode {
type Input = String;
type Output = String;
type Findings = ValidationError;
async fn repair(
&self,
_: &Runtime,
attempts: &[naaf_core::Attempt<ValidationError, String>],
) -> Result<String, Self::Error> {
let findings = attempts.last()
.and_then(|a| a.findings())
.unwrap_or(&vec![]);
let mut instructions = String::new();
for finding in findings {
instructions.push_str(&format!("// TODO: {}\n", finding.message));
}
Ok(format!("// Repair based on {} findings\n{}",
findings.len(), instructions))
}
}
}
See Also
- Task — Produces the subject checked
- Materialiser — Transforms output before checking
- Recipe — Combines Task with Check and RepairPlanner
Materialiser
The Materialiser trait transforms task output into a different type, often with side effects. It’s positioned between the task and validation stages.
Definition
#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait Materialiser: Send + Sync {
type Input;
type Output;
type Error;
async fn materialise(
&self,
runtime: &Runtime,
input: &Self::Input,
) -> Result<Self::Output, Self::Error>;
}
}
Parameters
Input— The type this materialiser consumes (typically Task’s output)Output— The type this materialiser produces (the Check’s subject)Error— The error type this materialiser may return
When to Use Materialiser
- Type transformation — Convert LLM output to executable code
- File operations — Write task output to disk before validation
- API calls — Transform output and submit to external services
- Resource acquisition — Allocate resources needed for validation
Example: Write to File
#![allow(unused)]
fn main() {
use naaf_core::{Materialiser, Runtime};
use std::path::PathBuf;
struct WriteToFile {
path: PathBuf,
}
#[async_trait::async_trait]
impl Materialiser for WriteToFile {
type Input = String;
type Output = PathBuf;
type Error = std::io::Error;
async fn materialise(
&self,
_: &Runtime,
input: &String,
) -> Result<PathBuf, std::io::Error> {
tokio::fs::write(&self.path, input).await?;
Ok(self.path.clone())
}
}
}
Usage in a step:
#![allow(unused)]
fn main() {
use naaf_core::Step;
let step = Step::builder(GenerateCode)
.materialise(WriteToFile { path: PathBuf::from("/tmp/output.rs") })
.validate(CheckCompiled)
.build();
}
Example: Transform to Executable Format
#![allow(unused)]
fn main() {
use naaf_core::{Materialiser, Runtime};
struct ParseToJson;
#[async_trait::async_trait]
impl Materialiser for ParseToJson {
type Input = String; // Raw LLM output
type Output = serde_json::Value; // Parsed JSON
type Error = serde_json::Error;
async fn materialise(
&self,
_: &Runtime,
input: &String,
) -> Result<serde_json::Value, serde_json::Error> {
serde_json::from_str(input)
}
}
}
The materialiser transforms raw string output into structured JSON that the check can validate.
Multiple Materialisers
Chain materialisers when multiple transformations are needed:
#![allow(unused)]
fn main() {
let step = Step::builder(GenerateConfig)
.materialise(ParseConfig) // String → Value
.materialise(ValidateSchema) // Value → ValidatedValue
.materialise(WriteToFile) // ValidatedValue → PathBuf
.validate(CheckFileExists)
.build();
}
Each materialiser’s output type must match the next materialiser’s input type.
See Also
- Task — Produces the input for materialisation
- Check — Validates the materialised output
- Step — Combines all stages
RepairPlanner
The RepairPlanner trait produces a new input from failed attempts. It’s the intelligence behind the retry loop.
Definition
#![allow(unused)]
fn main() {
#[async_trait::async_trait]
pub trait RepairPlanner: Send + Sync {
type Input;
type Output;
type Findings;
async fn repair(
&self,
runtime: &Runtime,
attempts: &[Attempt<Self::Findings, Self::Input>],
) -> Result<Self::Output, Self::Error>;
}
}
Parameters
Input— The type of input that started the attemptOutput— The new input to retry withFindings— The type of findings from failed checks
Accessing Attempt History
The repair method receives all previous attempts, giving full context for repair decisions:
#![allow(unused)]
fn main() {
use naaf_core::{Attempt, RepairPlanner, Runtime};
struct LlmRepairPlanner {
system_prompt: String,
}
#[async_trait::async_trait]
impl RepairPlanner for LlmRepairPlanner {
type Input = String;
type Output = String;
type Findings = ValidationError;
async fn repair(
&self,
runtime: &Runtime,
attempts: &[Attempt<ValidationError, String>],
) -> Result<String, Self::Error> {
let last_attempt = attempts.last().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::Other, "No attempts")
})?;
let history = attempts.iter().enumerate().map(|(i, a)| {
format!("Attempt {}:\nInput: {}\nFindings: {:?}",
i + 1, a.input(), a.findings())
}).join("\n---\n");
let prompt = format!(
"{}\n\nPrevious attempts:\n{}\n\nProduce a new input:",
self.system_prompt, history
);
let llm = runtime.llm().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "LLM not configured")
})?;
let response = llm.complete(&prompt).await?;
Ok(response)
}
}
}
Attempt Structure
Each Attempt<F, I> contains:
| Method | Description |
|---|---|
input() | The original input for this attempt |
output() | The task’s output |
subject() | The materialised subject (if any) |
findings() | The findings from checks |
accepted() | Whether the step passed validation |
Repair Strategies
Simple retry with modified input
#![allow(unused)]
fn main() {
struct IncrementRetry;
#[async_trait::async_trait]
impl RepairPlanner for IncrementRetry {
type Input = u32;
type Output = u32;
type Findings = ();
async fn repair(
&self,
_: &Runtime,
attempts: &[Attempt<(), u32>],
) -> Result<u32, Self::Error> {
Ok(attempts.len() as u32 + 1)
}
}
}
LLM-based repair
Uses an LLM to analyze failures and generate better input:
#![allow(unused)]
fn main() {
struct SmartRepair {
model: String,
system_prompt: String,
}
#[async_trait::async_trait]
impl RepairPlanner for SmartRepair {
type Input = String;
type Output = String;
type Findings = ValidationError;
async fn repair(
&self,
runtime: &Runtime,
attempts: &[Attempt<ValidationError, String>],
) -> Result<String, Self::Error> {
// Analyze findings and ask LLM for new input
let llm = runtime.llm().expect("LLM configured");
let response = llm.complete(&build_repair_prompt(self.system_prompt, attempts)).await?;
Ok(response)
}
}
}
Enabling Retry
A step only retries when a RepairPlanner is attached:
#![allow(unused)]
fn main() {
let step = Step::builder(GenerateCode)
.validate(SyntaxCheck)
.repair_with(LlmRepairPlanner { system_prompt: "Fix the code".to_string() })
.retry_policy(RetryPolicy::new(3)) // Allow up to 3 attempts
.build();
}
Without .repair_with(), the step does not retry — it returns immediately after the first failure.
See Also
- Step — Combines Task, Check, Materialiser, and RepairPlanner
- RetryPolicy — Configures retry behavior
- LLM Integration — Use LLMs for repair planning
Step
A Step owns one task plus its validation pipeline and optional repair loop. The builder pattern enforces type safety at compile time.
Creating a Step
#![allow(unused)]
fn main() {
use naaf_core::{Check, RetryPolicy, Step, Task};
let step = Step::builder(GenerateCode)
.validate(SyntaxCheck)
.validate(SecurityCheck)
.repair_with(RepairCode)
.retry_policy(RetryPolicy::new(3))
.build();
}
Builder Methods
| Method | Description | Returns |
|---|---|---|
Step::builder(task) | Start building with a task | StepBuilder |
.validate(check) | Add a check to the pipeline | &mut StepBuilder |
.materialise(materialiser) | Add a materialiser | &mut StepBuilder |
.repair_with(planner) | Enable retry on failure | &mut StepBuilder |
.retry_policy(policy) | Configure max attempts | &mut StepBuilder |
.build() | Produce a runnable step | Step |
Type Safety
The builder enforces that types are compatible:
#![allow(unused)]
fn main() {
// This won't compile - type mismatch:
// Task output (String) doesn't match Check subject (u32)
Step::builder(GenerateString)
.validate(ValidateNumber) // Error!
.build();
}
Retry Policy
#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;
// Default: 1 attempt (no retry)
let policy = RetryPolicy::default();
// Custom: up to N attempts
let policy = RetryPolicy::new(3);
// Exponential backoff
let policy = RetryPolicy::new(3).with_backoff(
tokio::time::Duration::from_secs(1),
tokio::time::Duration::from_secs(10),
);
}
Running a Step
Basic run
#![allow(unused)]
fn main() {
let output = step.run(&runtime, &input).await?;
}
Run with tracing
#![allow(unused)]
fn main() {
use naaf_core::TracedOutput;
let traced = step.run_traced(&runtime, &input).await?;
println!("Output: {:?}", traced.output());
println!("Attempts: {}", traced.report().attempt_count());
for attempt in traced.report().attempts() {
println!(" accepted={}, findings={:?}", attempt.accepted(), attempt.findings());
}
}
Error Handling
Steps produce a StepError<F, E>:
#![allow(unused)]
fn main() {
use naaf_core::StepError;
match result {
Ok(output) => println!("Success: {:?}", output),
Err(StepError::System { stage, error }) => {
eprintln!("Infrastructure failure at {}: {:?}", stage, error);
}
Err(StepError::Rejected(report)) => {
eprintln!("Exhausted retries after {} attempts", report.attempt_count());
for attempt in report.attempts() {
eprintln!(" - findings: {:?}", attempt.findings());
}
}
}
}
Step Outputs
A Step can be used as a Task, Check, or Materialiser in other contexts:
#![allow(unused)]
fn main() {
use naaf_core::{Check, Task};
// Using a step as a Task in another step
let composed = Step::builder(step) // step implements Task
.validate(FinalCheck)
.build();
}
Finding Types
Steps track findings throughout execution. Access them via the report:
#![allow(unused)]
fn main() {
let traced = step.run_traced(&runtime, &input).await?;
let report = traced.report();
println!("Total attempts: {}", report.attempt_count());
println!("Final state: {:?}", report.final_state());
}
See Also
- Task — The task this step wraps
- Check — The validation checks
- Materialiser — Type transformations
- RepairPlanner — Retry logic
Building Workflows
Workflows compose individual steps into larger patterns. naaf provides three composition models:
- Sequential — Output from one step feeds into the next
- Parallel — Steps run concurrently, then reconcile
- Dynamic — Runtime-determined topology with
Workflow
Sequential Workflows
The simplest pattern: chain steps where output flows to input.
#![allow(unused)]
fn main() {
use naaf_core::Step;
// Sequential: A → B → C
let workflow = step_a
.then(step_b)
.then(step_c);
}
The output type of each step must match the input type of the next.
Parallel Workflows
Run multiple steps concurrently:
#![allow(unused)]
fn main() {
// Fan-out: same input, both run
let both = step_a.join(step_b);
// Fan-out: different inputs
let pair = step_a.zip(step_b);
// Fan-in: reconcile parallel outputs
let merged = both.reconcile_task(MergeResults);
}
Dynamic Workflows
For runtime-determined topologies, use Workflow with nodes:
#![allow(unused)]
fn main() {
use naaf_core::{StepNode, Workflow, NodeSpec, EdgeSpec, GraphPatch};
// Create typed step nodes
let plan_node = StepNode::new(
Step::builder(PlanProject).with_findings::<()>().build(),
|input: &NodeInput| input.seed_as::<ProjectBrief>(),
);
// Nodes can spawn downstream work
let root = StepNode::new(
Step::builder(root_step).with_findings::<()>().build(),
|input: &NodeInput| input.seed_as::<Input>(),
).spawn_with(|_context, output| {
GraphPatch::new()
.with_node(node_a)
.with_node(node_b)
.with_edge(EdgeSpec::new(root_id, node_a_id))
});
let workflow = Workflow::new()
.with_patch(GraphPatch::new().with_node(root_spec))?;
let report = workflow.run(&runtime).await?;
}
Guide to Composition
- Sequential Composition — Chain steps linearly
- Parallel Composition — Fan-out and fan-in
- Dynamic Workflows — Runtime graph construction
Sequential Composition
Sequential composition chains steps where output flows from one to the next.
The .then() Combinator
#![allow(unused)]
fn main() {
let workflow = step_a.then(step_b);
}
The output type of step_a must match the input type of step_b.
Example
#![allow(unused)]
fn main() {
use naaf_core::Step;
// A: String → Code
let generate = Step::builder(GenerateCode)
.validate(SyntaxCheck)
.build();
// B: Code → TestResults
let test = Step::builder(RunTests)
.validate(TestResultsCheck)
.build();
// C: TestResults → FinalOutput
let format = Step::builder(FormatOutput)
.validate(FinalCheck)
.build();
// Chain: String → Code → TestResults → FinalOutput
let pipeline = generate.then(test).then(format);
let output = pipeline.run(&runtime, &"build a counter".to_string()).await?;
}
Type Conversion
Use Materialiser for type conversion:
#![allow(unused)]
fn main() {
use naaf_core::Materialiser;
// A returns Code, B expects PathBuf
let generate_and_save = Step::builder(GenerateCode)
.materialise(WriteToFile)
.validate(FileCheck)
.build();
// B expects PathBuf, A returns String - types match now
let pipeline = generate_and_save.then(run_tests);
}
Short-circuit Evaluation
The pipeline stops on first error:
#![allow(unused)]
fn main() {
let result = pipeline.run(&runtime, &input).await;
match result {
Ok(output) => println!("Success: {:?}", output),
Err(e) => {
// Execution stopped at the failing step
eprintln!("Pipeline failed: {:?}", e);
}
}
}
With Retry
Each step in the pipeline can have its own retry policy:
#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;
let pipeline = Step::builder(GenerateCode) // No retry
.validate(SyntaxCheck)
.build()
.then(
Step::builder(RunTests) // With retry
.validate(TestCheck)
.repair_with(TestRepairer)
.retry_policy(RetryPolicy::new(3))
.build()
);
}
See Also
- Parallel Composition — Concurrent step execution
- Dynamic Workflows — Runtime-determined topology
Parallel Composition
Parallel composition runs multiple steps concurrently and reconciles their results.
Combinators
| Combinator | Description |
|---|---|
.join(other) | Same input, both run, output is a tuple |
.zip(other) | Different inputs, both run concurrently |
.reconcile_task(task) | Fan-in: reconcile parallel outputs |
.join() — Same Input
Both steps receive the same input and run concurrently:
#![allow(unused)]
fn main() {
let both = step_a.join(step_b);
// Input: String
// Output: (OutputA, OutputB)
let result = both.run(&runtime, &input).await?;
}
.zip() — Different Inputs
Steps receive different inputs:
#![allow(unused)]
fn main() {
use naaf_core::Step;
let generate_code = Step::builder(GenerateCode).build();
let generate_tests = Step::builder(GenerateTests).build();
let zipped = generate_code.zip(generate_tests);
// Input: (CodeRequest, TestRequest)
// Output: (Code, Tests)
let result = zipped.run(&runtime, &(
"build a counter".to_string(),
"test the counter".to_string(),
)).await?;
}
.reconcile_task() — Fan-in
Reconcile parallel outputs with a task:
#![allow(unused)]
fn main() {
use naaf_core::Step;
let branch_a = Step::builder(GenerateCode).build();
let branch_b = Step::builder(GenerateTests).build();
let merged = branch_a
.join(branch_b)
.reconcile_task(MergeCodeAndTests);
// Input: Request
// Output: MergedOutput
let result = merged.run(&runtime, &request).await?;
}
Example: Fan-out/Fan-in
#![allow(unused)]
fn main() {
use naaf_core::Step;
// Fan-out: same request to three generators
let search = Step::builder(SearchDocs).build();
let code = Step::builder(GenerateCode).build();
let test = Step::builder(GenerateTests).build();
let branches = search.join(code).join(test);
// Fan-in: reconcile the three results
let pipeline = branches.reconcile_task(CombineResults);
// Run concurrently
let output = pipeline.run(&runtime, &query).await?;
}
Concurrency
Steps in .join() and .zip() run concurrently via the runtime’s executor:
#![allow(unused)]
fn main() {
use naaf_core::Runtime;
let runtime = Runtime::new(); // Uses tokio runtime
// These run in parallel:
let result = combined.run(&runtime, &input).await;
}
Error Handling
If any branch fails, the whole pipeline fails:
#![allow(unused)]
fn main() {
match result {
Ok((a, b)) => println!("Both succeeded: {:?}, {:?}", a, b),
Err(e) => {
// One or both branches failed
eprintln!("Parallel execution failed: {:?}", e);
}
}
}
Partial results are not preserved — all branches must succeed.
With Retry
Each branch can have its own retry policy:
#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;
let robust = Step::builder(SearchDocs)
.validate(DocCheck)
.repair_with(DocRepairer)
.retry_policy(RetryPolicy::new(3))
.build()
.join(
Step::builder(GenerateCode)
.validate(CodeCheck)
.repair_with(CodeRepairer)
.retry_policy(RetryPolicy::new(5))
.build()
);
}
See Also
- Sequential Composition — Linear chaining
- Dynamic Workflows — Runtime-determined topology
Dynamic Workflows
Dynamic workflows construct their topology at runtime using Workflow, StepNode, and GraphPatch.
Core Types
| Type | Purpose |
|---|---|
StepNode | A node in the workflow graph |
NodeSpec | Defines a step’s requirements |
EdgeSpec | Defines connections between nodes |
GraphPatch | Adds nodes/edges at runtime |
Workflow | Executes the directed acyclic graph |
Creating Nodes
#![allow(unused)]
fn main() {
use naaf_core::{StepNode, NodeSpec, NodeInput};
let node = StepNode::new(
Step::builder(MyTask)
.validate(MyCheck)
.build(),
|input: &NodeInput| input.seed_as::<MyInput>(),
);
}
Each node has:
- A step to execute
- A closure that extracts/seeds input from node input
Spawning Dynamic Nodes
Nodes can spawn new nodes at runtime:
#![allow(unused)]
fn main() {
let root = StepNode::new(
Step::builder(RootStep).build(),
|input: &NodeInput| input.seed_as::<RootInput>(),
).spawn_with(|context, output| {
// Analyze output and decide what nodes to spawn
match output.needs_subtasks() {
true => GraphPatch::new()
.with_node(child_node_a)
.with_node(child_node_b)
.with_edge(EdgeSpec::new(context.node_id, child_node_a_id)),
false => GraphPatch::new(),
}
});
}
Building the Workflow
#![allow(unused)]
fn main() {
use naaf_core::{Workflow, GraphPatch};
let workflow = Workflow::new()
.with_patch(GraphPatch::new()
.with_node(root_spec)
.with_node(node_a)
.with_node(node_b)
.with_edge(EdgeSpec::new(root_id, node_a_id))
)?;
let report = workflow.run(&runtime).await?;
}
Scheduling
The runtime schedules nodes when their dependencies complete:
- Nodes with no dependencies run immediately
- Nodes with satisfied dependencies run in parallel
- Downstream patches are applied when parent nodes complete
Node Input
Each node receives a NodeInput:
#![allow(unused)]
fn main() {
fn extract_input(input: &NodeInput) -> MyType {
// Get from seed
input.seed_as::<MyType>()
// Or from previous node output
input.get_output::<PreviousOutput>(node_id)
}
}
Example: Dynamic Task Generation
#![allow(unused)]
fn main() {
use naaf_core::{StepNode, Workflow, GraphPatch, NodeInput, EdgeSpec};
let plan_node = StepNode::new(
Step::builder(PlanTasks).build(),
|input: &NodeInput| input.seed_as::<ProjectBrief>(),
).spawn_with(|context, plan| {
let mut patch = GraphPatch::new();
for (i, task) in plan.tasks.iter().enumerate() {
let task_node = StepNode::new(
Step::builder(ExecuteTask)
.validate(TaskCheck)
.build(),
|input: &NodeInput| input.seed_as::<TaskInput>(),
);
patch = patch
.with_node(task_node)
.with_edge(EdgeSpec::new(context.node_id, task_node_id));
}
patch
};
let workflow = Workflow::new()
.with_patch(GraphPatch::new().with_node(plan_node))?;
let report = workflow.run(&runtime).await?;
}
Error Handling
The workflow report contains results from all nodes:
#![allow(unused)]
fn main() {
let report = workflow.run(&runtime).await?;
for (node_id, result) in report.node_results() {
match result {
Ok(output) => println!("Node {}: {:?}", node_id, output),
Err(e) => eprintln!("Node {} failed: {:?}", node_id, e),
}
}
}
See Also
- Sequential Composition — Linear chaining
- Parallel Composition — Concurrent execution
Observability
naaf provides structured tracing for debugging and monitoring workflows. All core traits have extension methods for observability.
TaskExt
Wrap any task with observability:
#![allow(unused)]
fn main() {
use naaf_core::TaskExt;
let observed = my_task.observed(); // Auto-named
let observed = my_task.observed_as("name"); // Custom name
}
CheckExt
Wrap checks:
#![allow(unused)]
fn main() {
use naaf_core::CheckExt;
let observed_check = my_check.observed();
let observed_check = my_check.observed_as("validate_output");
}
Running with Tracing
Use run_traced() to get detailed execution reports:
#![allow(unused)]
fn main() {
use naaf_core::TracedOutput;
let traced = step.run_traced(&runtime, &input).await?;
println!("Output: {:?}", traced.output());
println!("Attempts: {}", traced.report().attempt_count());
for attempt in traced.report().attempts() {
println!(" accepted={}, findings={:?}",
attempt.accepted(),
attempt.findings()
);
}
}
AttemptReport
Each attempt contains:
| Method | Description |
|---|---|
input() | Original input |
output() | Task output |
subject() | Materialised subject |
findings() | Validation findings |
accepted() | Whether check passed |
TracedOutput
#![allow(unused)]
fn main() {
struct TracedOutput<F, O> {
output: O,
report: StepReport<F>,
}
}
Methods:
output()— Get the final outputreport()— Get the execution report
StepReport
#![allow(unused)]
fn main() {
struct StepReport<F> {
attempts: Vec<AttemptReport<F>>,
}
}
Methods:
attempt_count()— Total attempts madeattempts()— Iterator over attemptsfinal_state()— Final acceptance statefindings()— All findings from all attempts
Structured Logging
naaf uses the tracing crate:
#![allow(unused)]
fn main() {
use tracing::{info, error, warn};
info!(step = "generate", "running step");
warn!(findings = ?findings, "validation failed, retrying");
error!(attempt = 3, "retry exhausted");
}
Configuration
Enable tracing in your runtime:
#![allow(unused)]
fn main() {
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
tracing_subscriber::registry()
.with(fmt::layer())
.init();
}
See Also
- Examples — See observability in action
Integrations
naaf integrates with multiple backends for LLM calls, shell commands, and vector databases.
Available Integrations
| Integration | Crate | Description |
|---|---|---|
| LLM | naaf-llm | OpenAI, Anthropic, custom providers |
| Process | naaf-process | Shell command execution |
| Vector DB | naaf-qdrant | Qdrant vector database |
| Knowledge | naaf-knowledge | Knowledge base orchestration |
Guide to Integrations
- LLM Integration — Connect to language models
- Process Integration — Run shell commands
- Knowledge Base — Build knowledge-assisted workflows
LLM Integration
The naaf-llm crate provides LLM-backed adapters, tool calling, and multiple provider support.
Setup
[dependencies]
naaf-llm = "0.1.0"
With OpenAI support:
naaf-llm = { version = "0.1.0", features = ["openai"] }
LlmClient
#![allow(unused)]
fn main() {
use naaf_llm::LlmClient;
let client = LlmClient::builder()
.api_key(std::env::var("OPENAI_API_KEY")?)
.model("gpt-4")
.build();
}
LlmAgent
The LlmAgent provides a shared executor that can be projected into any core trait:
#![allow(unused)]
fn main() {
use naaf_llm::{LlmAgent, Message, CompletionRequest, ExecutionOutcome};
let agent = LlmAgent::new(client);
// Project into Task
let task = agent.task(
|_, input: String| Ok(CompletionRequest::new("gpt-4", vec![Message::user(input)])),
|outcome: ExecutionOutcome| Ok(outcome.final_message().content.clone().unwrap_or_default()),
);
// Project into Check
let check = agent.check(
|_, subject: String| Ok(CompletionRequest::new("gpt-4", vec![Message::user(subject)])),
|outcome: ExecutionOutcome| serde_json::from_str(&outcome.final_message().content.as_deref().unwrap_or("[]")),
);
// Project into RepairPlanner
let repair = agent.repair(
|_, attempts: Vec<Attempt<Findings, Input>| { ... },
|outcome: ExecutionOutcome| Ok(outcome.final_message().content.clone().unwrap_or_default()),
);
}
Tool Calling
Define tools for the LLM to use:
#![allow(unused)]
fn main() {
use naaf_llm::{Tool, ToolRegistry, ToolSpec};
struct SearchTool;
impl Tool for SearchTool {
type Input = String;
type Output = Vec<SearchResult>;
fn spec(&self) -> ToolSpec {
ToolSpec::new("search", "Search the knowledge base")
.add_parameter("query", "string", "The search query")
}
async fn execute(&self, runtime: &Runtime, input: &String) -> Result<Vec<SearchResult>, Self::Error> {
// Execute the tool
Ok(search(&runtime, input).await?)
}
}
let registry = ToolRegistry::new()
.with_tool(SearchTool);
}
Dynamic Spawning
Spawn new nodes from tool calls:
#![allow(unused)]
fn main() {
use naaf_llm::{SpawnTool, SpawnResult, resolve_spawn};
let spawn_tool = SpawnTool::new(registry);
let outcome = llm.execute_with_tools(&request, &[spawn_tool]).await?;
if let Some(SpawnResult { spawn, .. }) = outcome.tool_calls().first() {
let patch = resolve_spawn(spawn, node_context).await?;
workflow.apply_patch(patch)?;
}
}
Built-in Adapters
| Adapter | Description |
|---|---|
LlmTask | Project LLM as a Task |
LlmCheck | Project LLM as a Check |
LlmMaterialiser | Project LLM as a Materialiser |
LlmRepairPlanner | Project LLM as a RepairPlanner |
Providers
OpenAI
#![allow(unused)]
fn main() {
use naaf_llm::OpenAIClient;
let client = OpenAIClient::new(api_key, "gpt-4");
}
Anthropic
#![allow(unused)]
fn main() {
use naaf_llm::AnthropicClient;
let client = AnthropicClient::new(api_key, "claude-3-opus-20240229");
}
Custom Provider
Implement the LlmProvider trait for other providers:
#![allow(unused)]
fn main() {
impl LlmProvider for MyProvider {
async fn complete(&self, request: &CompletionRequest) -> Result<CompletionResponse, Self::Error>;
async fn complete_with_tools(&self, request: &CompletionRequest, tools: &[Box<dyn Tool>]) -> Result<CompletionResponse, Self::Error>;
}
}
Process Integration
The naaf-process crate wraps shell commands into core traits.
Setup
[dependencies]
naaf-process = "0.1.0"
ProcessAgent
#![allow(unused)]
fn main() {
use naaf_process::ProcessAgent;
let agent = ProcessAgent::new();
}
Converting Commands to Traits
ProcessTask
#![allow(unused)]
fn main() {
use naaf_process::{ProcessCommand, ProcessOutput};
let task = agent.task(
|_, script: String| Ok(ProcessCommand::shell(script)),
|output: ProcessOutput| String::from_utf8(output.stdout),
);
}
ProcessCheck
#![allow(unused)]
fn main() {
use naaf_process::ProcessOutput;
let check = agent.check(
|_, subject: String| Ok(ProcessCommand::shell(format!("echo '{}' | sh", subject))),
|output: ProcessOutput| {
let result = String::from_utf8_lossy(&output.stdout);
if result.contains("error") {
vec![CheckError { message: result.to_string() }]
} else {
vec![]
}
},
);
}
ProcessMaterialiser
#![allow(unused)]
fn main() {
use naaf_process::{ProcessOutput, FilePath};
let materialiser = agent.materialiser(
|_, content: String| {
let path = "/tmp/script.sh";
std::fs::write(path, &content)?;
Ok(ProcessCommand::shell(format!("chmod +x {}", path)))
},
|output: ProcessOutput| Ok(FilePath::from("/tmp/script.sh")),
);
}
ProcessRepairPlanner
#![allow(unused)]
fn main() {
use naaf_core::Attempt;
let repair = agent.repair(
|_, attempts: &[Attempt<Error, String>]| {
let last = attempts.last().unwrap();
let suggestion = format!("# Suggestion: {}\n{}", last.findings().first().map(|f| f.message.clone()).unwrap_or_default());
Ok(ProcessCommand::shell(suggestion))
},
|output: ProcessOutput| String::from_utf8(output.stdout),
);
}
ProcessCommand
#![allow(unused)]
fn main() {
use naaf_process::ProcessCommand;
// Shell command
ProcessCommand::shell("ls -la")
// Direct program with args
ProcessCommand::new("cargo", &["build", "--release"])
// With working directory
ProcessCommand::shell("cargo test")
.cwd("/path/to/project")
}
ProcessOutput
#![allow(unused)]
fn main() {
struct ProcessOutput {
stdout: Vec<u8>,
stderr: Vec<u8>,
status: ExitStatus,
}
}
Working Directory
#![allow(unused)]
fn main() {
let task = agent.task(
|_, script: String| {
Ok(ProcessCommand::shell(script)
.cwd("/path/to/project"))
},
|output: ProcessOutput| { ... },
);
}
Environment Variables
#![allow(unused)]
fn main() {
let task = agent.task(
|_, script: String| {
Ok(ProcessCommand::shell(script)
.env("RUST_LOG", "debug"))
},
|output: ProcessOutput| { ... },
);
}
Example: Cargo Workflow
#![allow(unused)]
fn main() {
use naaf_core::Step;
let generate = Step::builder(GenerateCode).build();
let cargo_check = agent.task(
|_, code: String| Ok(ProcessCommand::shell(format!("echo '{}' | cargo check 2>&1", code))),
|output: ProcessOutput| {
let err = String::from_utf8_lossy(&output.stderr);
if err.contains("error") || err.contains("warning") {
Err(StepError::Rejected(vec![Findings { message: err.to_string() }]))
} else {
Ok(code)
}
},
);
let workflow = generate.then(cargo_check);
}
See Also
- Examples — Standalone process examples
- LLM Integration — Combine LLM with process
Knowledge Base
The naaf-knowledge crate provides knowledge orchestration with Qdrant vector database.
Setup
[dependencies]
naaf-knowledge = "0.1.0"
naaf-qdrant = "0.1.0"
Architecture
User Input → KnowledgeIngest → Qdrant
↓
Query Request → KnowledgeQuery → Qdrant → LLM Context
↓
Lint Request → KnowledgeLint → Qdrant → LLM Feedback
QdrantClient
#![allow(unused)]
fn main() {
use naaf_qdrant::QdrantClient;
let client = QdrantClient::new(
"http://localhost:6333",
Some("api-key"),
);
}
Knowledge Operations
Ingest
#![allow(unused)]
fn main() {
use naaf_knowledge::{IngestRequest, IngestResponse};
let request = IngestRequest::new()
.with_documents(docs)
.with_chunking_config(ChunkingConfig::default()
.chunk_size(1024)
.overlap(128))
.with_embeddings(embeddings_config);
let response = knowledge.ingest(&runtime, request).await?;
}
Query
#![allow(unused)]
fn main() {
use naaf_knowledge::{QueryRequest, QueryResponse};
let request = QueryRequest::new()
.with_query("how do I build a step?")
.with_limit(5)
.with_threshold(0.7);
let response = knowledge.query(&runtime, request).await?;
for result in response.results() {
println!("Score: {:.2}, Content: {}", result.score(), result.content());
}
}
Lint
#![allow(unused)]
fn main() {
use naaf_knowledge::{LintRequest, LintResponse};
let request = LintRequest::new()
.with_code(code)
.with_context(context_docs);
let response = knowledge.lint(&runtime, request).await?;
}
Chunking Strategies
Fixed Size
#![allow(unused)]
fn main() {
use naaf_qdrant::ChunkingConfig;
ChunkingConfig::default()
.chunk_size(1024)
.overlap(128)
}
Semantic
Split by paragraphs, sections, or code blocks:
#![allow(unused)]
fn main() {
ChunkingConfig::semantic()
.separators(&["\n\n", "\n", " "])
.min_chunk_size(100)
.max_chunk_size(2048)
}
Embeddings
#![allow(unused)]
fn main() {
use naaf_qdrant::{EmbeddingAdapter, EmbeddingRequest};
let adapter = naaf_qdrant::OpenAIEmbeddings::new(client.clone(), "text-embedding-ada-002");
let request = EmbeddingRequest::new()
.with_texts(texts);
let embeddings = adapter.embed(&runtime, request).await?;
}
Collections
#![allow(unused)]
fn main() {
use naaf_qdrant::{client, CreateCollection};
client.create_collection("my-collection").await?;
let collection = client.collection("my-collection");
}
CLI
The naaf-cli crate provides a CLI for knowledge base operations:
# Ingest documents
naaf kb ingest --path /path/to/docs
# Query the knowledge base
naaf kb query "how do I build a workflow?"
# Start the API server
naaf kb serve --port 8080
Example: Full Workflow
#![allow(unused)]
fn main() {
use naaf_knowledge::{KnowledgeOrchestrator, IngestRequest, QueryRequest};
use naaf_core::Step;
let orchestrator = KnowledgeOrchestrator::new(
qdrant_client,
llm_client,
);
// Ingest documentation
tokio::spawn(async move {
orchestrator.ingest(&runtime, IngestRequest::new()
.with_documents(load_docs("docs/").await?)
).await.unwrap();
});
// Query with context
let result = orchestrator.query(&runtime, QueryRequest::new()
.with_query(user_query)
.with_limit(3)
).await?;
}
See Also
- Qdrant Integration — Vector database
- Examples — Knowledge base examples
Examples
Each example is a standalone binary in the examples/ directory. Run them with cargo run -p <example-name>.
Running Examples
# List all examples
ls examples/
# Run a specific example
cargo run -p step-retry
# Run with output visible
cargo run -p step-retry -- --nocapture
Example Index
| Example | Description |
|---|---|
| step-retry | Task-check-repair loop with a planning task that validates and retries |
| materialiser | Materialising task output into a different type before validation |
| join-reconcile | Parallel fan-out with .join() and fan-in with .reconcile_task() |
| composed-workflow | Sequential retry + parallel composition |
| dynamic-workflow | Runtime graph construction with Workflow, StepNode, and GraphPatch |
| process-task | Shell-command integration via naaf-process |
| build-test | Generate → materialise → validate → repair loop at the heart of naaf |
| knowledge-tool | Knowledge base integration |
| tui-demo | Terminal UI demonstration |
Example Descriptions
step-retry
The core example: a task that generates output, validates it with a check, and retries with a repair planner on failure.
materialiser
Shows how to transform output between task and check. A task produces raw text, a materialiser writes it to a file, and the check validates the file.
join-reconcile
Parallel execution: use .join() to run two steps with the same input concurrently, then .reconcile_task() to merge their results.
composed-workflow
Combines sequential and parallel composition: step A → (step B + step C) → step D.
dynamic-workflow
Runtime-determined topology: nodes can spawn new nodes based on their output using spawn_with() and GraphPatch.
process-task
Shell command integration: wraps cargo check, cargo test, and other CLI tools as tasks and checks.
build-test
The quintessential workflow: generate code → write to file → compile → test → fix if needed.
knowledge-tool
Qdrant integration: ingest documentation and query it with semantic search.
tui-demo
Interactive terminal UI for observing workflow execution and providing human feedback.
Adding New Examples
- Create a new directory in
examples/ - Add a
Cargo.tomlwith the crate name - Implement your example
- Add it to
Cargo.tomlworkspace members
# examples/my-example/Cargo.toml
[package]
name = "my-example"
version = "0.1.0"
edition = "2024"
[dependencies]
naaf-core = { path = "../crates/core" }
tokio = { version = "1", features = ["full"] }