Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Parallel Composition

Parallel composition runs multiple steps concurrently and reconciles their results.

Combinators

CombinatorDescription
.join(other)Same input, both run, output is a tuple
.zip(other)Different inputs, both run concurrently
.reconcile_task(task)Fan-in: reconcile parallel outputs

.join() — Same Input

Both steps receive the same input and run concurrently:

#![allow(unused)]
fn main() {
let both = step_a.join(step_b);

// Input: String
// Output: (OutputA, OutputB)
let result = both.run(&runtime, &input).await?;
}

.zip() — Different Inputs

Steps receive different inputs:

#![allow(unused)]
fn main() {
use naaf_core::Step;

let generate_code = Step::builder(GenerateCode).build();
let generate_tests = Step::builder(GenerateTests).build();

let zipped = generate_code.zip(generate_tests);

// Input: (CodeRequest, TestRequest)
// Output: (Code, Tests)
let result = zipped.run(&runtime, &(
    "build a counter".to_string(),
    "test the counter".to_string(),
)).await?;
}

.reconcile_task() — Fan-in

Reconcile parallel outputs with a task:

#![allow(unused)]
fn main() {
use naaf_core::Step;

let branch_a = Step::builder(GenerateCode).build();
let branch_b = Step::builder(GenerateTests).build();

let merged = branch_a
    .join(branch_b)
    .reconcile_task(MergeCodeAndTests);

// Input: Request
// Output: MergedOutput
let result = merged.run(&runtime, &request).await?;
}

Example: Fan-out/Fan-in

#![allow(unused)]
fn main() {
use naaf_core::Step;

// Fan-out: same request to three generators
let search = Step::builder(SearchDocs).build();
let code = Step::builder(GenerateCode).build();
let test = Step::builder(GenerateTests).build();

let branches = search.join(code).join(test);

// Fan-in: reconcile the three results
let pipeline = branches.reconcile_task(CombineResults);

// Run concurrently
let output = pipeline.run(&runtime, &query).await?;
}

Concurrency

Steps in .join() and .zip() run concurrently via the runtime’s executor:

#![allow(unused)]
fn main() {
use naaf_core::Runtime;

let runtime = Runtime::new();  // Uses tokio runtime

// These run in parallel:
let result = combined.run(&runtime, &input).await;
}

Error Handling

If any branch fails, the whole pipeline fails:

#![allow(unused)]
fn main() {
match result {
    Ok((a, b)) => println!("Both succeeded: {:?}, {:?}", a, b),
    Err(e) => {
        // One or both branches failed
        eprintln!("Parallel execution failed: {:?}", e);
    }
}
}

Partial results are not preserved — all branches must succeed.

With Retry

Each branch can have its own retry policy:

#![allow(unused)]
fn main() {
use naaf_core::RetryPolicy;

let robust = Step::builder(SearchDocs)
    .validate(DocCheck)
    .repair_with(DocRepairer)
    .retry_policy(RetryPolicy::new(3))
    .build()
    .join(
        Step::builder(GenerateCode)
            .validate(CodeCheck)
            .repair_with(CodeRepairer)
            .retry_policy(RetryPolicy::new(5))
            .build()
    );
}

See Also