Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Dynamic Workflows

Dynamic workflows construct their topology at runtime using Workflow, StepNode, and GraphPatch.

Core Types

TypePurpose
StepNodeA node in the workflow graph
NodeSpecDefines a step’s requirements
EdgeSpecDefines connections between nodes
GraphPatchAdds nodes/edges at runtime
WorkflowExecutes the directed acyclic graph

Creating Nodes

#![allow(unused)]
fn main() {
use naaf_core::{StepNode, NodeSpec, NodeInput};

let node = StepNode::new(
    Step::builder(MyTask)
        .validate(MyCheck)
        .build(),
    |input: &NodeInput| input.seed_as::<MyInput>(),
);
}

Each node has:

  • A step to execute
  • A closure that extracts/seeds input from node input

Spawning Dynamic Nodes

Nodes can spawn new nodes at runtime:

#![allow(unused)]
fn main() {
let root = StepNode::new(
    Step::builder(RootStep).build(),
    |input: &NodeInput| input.seed_as::<RootInput>(),
).spawn_with(|context, output| {
    // Analyze output and decide what nodes to spawn
    match output.needs_subtasks() {
        true => GraphPatch::new()
            .with_node(child_node_a)
            .with_node(child_node_b)
            .with_edge(EdgeSpec::new(context.node_id, child_node_a_id)),
        false => GraphPatch::new(),
    }
});
}

Building the Workflow

#![allow(unused)]
fn main() {
use naaf_core::{Workflow, GraphPatch};

let workflow = Workflow::new()
    .with_patch(GraphPatch::new()
        .with_node(root_spec)
        .with_node(node_a)
        .with_node(node_b)
        .with_edge(EdgeSpec::new(root_id, node_a_id))
    )?;

let report = workflow.run(&runtime).await?;
}

Scheduling

The runtime schedules nodes when their dependencies complete:

  • Nodes with no dependencies run immediately
  • Nodes with satisfied dependencies run in parallel
  • Downstream patches are applied when parent nodes complete

Node Input

Each node receives a NodeInput:

#![allow(unused)]
fn main() {
fn extract_input(input: &NodeInput) -> MyType {
    // Get from seed
    input.seed_as::<MyType>()
    
    // Or from previous node output
    input.get_output::<PreviousOutput>(node_id)
}
}

Example: Dynamic Task Generation

#![allow(unused)]
fn main() {
use naaf_core::{StepNode, Workflow, GraphPatch, NodeInput, EdgeSpec};

let plan_node = StepNode::new(
    Step::builder(PlanTasks).build(),
    |input: &NodeInput| input.seed_as::<ProjectBrief>(),
).spawn_with(|context, plan| {
    let mut patch = GraphPatch::new();

    for (i, task) in plan.tasks.iter().enumerate() {
        let task_node = StepNode::new(
            Step::builder(ExecuteTask)
                .validate(TaskCheck)
                .build(),
            |input: &NodeInput| input.seed_as::<TaskInput>(),
        );
        patch = patch
            .with_node(task_node)
            .with_edge(EdgeSpec::new(context.node_id, task_node_id));
    }

    patch
};

let workflow = Workflow::new()
    .with_patch(GraphPatch::new().with_node(plan_node))?;

let report = workflow.run(&runtime).await?;
}

Error Handling

The workflow report contains results from all nodes:

#![allow(unused)]
fn main() {
let report = workflow.run(&runtime).await?;

for (node_id, result) in report.node_results() {
    match result {
        Ok(output) => println!("Node {}: {:?}", node_id, output),
        Err(e) => eprintln!("Node {} failed: {:?}", node_id, e),
    }
}
}

See Also