DocumentationGuidesWorkflows

Workflows

How to structure task flows with nodes and dependencies.

What is a Workflow?

A workflow defines the execution flow of tasks in your agent. It’s a directed graph where:

  • Nodes are processing steps
  • next_nodes define execution order
  • entry_node is where execution starts

Basic Workflow

from ainalyn import WorkflowBuilder, NodeBuilder
 
workflow = (
    WorkflowBuilder("process-data")
    .description("Fetch and process data")
    .add_node(
        NodeBuilder("fetch")
        .description("Fetch raw data")
        .uses_module("http-client")
        .outputs("raw_data")
        .next_nodes("process")
        .build()
    )
    .add_node(
        NodeBuilder("process")
        .description("Process the data")
        .uses_prompt("analyzer")
        .inputs("raw_data")
        .build()
    )
    .entry_node("fetch")
    .build()
)

Execution flow: fetch → process

Node Dependencies

Dependencies define execution order through next_nodes.

Linear flow:

# A → B → C
node_a = (
    NodeBuilder("step-a")
    .description("First step")
    .uses_prompt("prompt-a")
    .next_nodes("step-b")
    .build()
)
 
node_b = (
    NodeBuilder("step-b")
    .description("Second step")
    .uses_prompt("prompt-b")
    .next_nodes("step-c")
    .build()
)
 
node_c = (
    NodeBuilder("step-c")
    .description("Final step")
    .uses_prompt("prompt-c")
    .build()
)

Parallel execution:

# A → [B, C] → D
node_a = (
    NodeBuilder("start")
    .description("Start processing")
    .uses_prompt("start-prompt")
    .next_nodes("process-1", "process-2")  # Both run in parallel
    .build()
)
 
node_b = (
    NodeBuilder("process-1")
    .description("First parallel task")
    .uses_prompt("process-prompt-1")
    .next_nodes("merge")
    .build()
)
 
node_c = (
    NodeBuilder("process-2")
    .description("Second parallel task")
    .uses_prompt("process-prompt-2")
    .next_nodes("merge")
    .build()
)
 
node_d = (
    NodeBuilder("merge")
    .description("Merge results")
    .uses_prompt("merge-prompt")
    .build()
)

Conditional branching:

# A → [B or C] (decided by platform)
node_a = (
    NodeBuilder("analyze")
    .description("Analyze input")
    .uses_module("analyzer")
    .next_nodes("path-a", "path-b")  # Platform decides which path
    .build()
)

Entry Node

Every workflow must specify where execution starts.

workflow = (
    WorkflowBuilder("my-workflow")
    .entry_node("start")  # Execution begins here
    .add_node(
        NodeBuilder("start")
        .description("Starting point")
        .build()
    )
    .build()
)

Rules:

  • Entry node must exist in the workflow’s nodes
  • Only one entry node per workflow
  • Entry node name must match exactly

Data Flow

Nodes pass data using inputs and outputs.

# Node 1: Produces data
producer = (
    NodeBuilder("fetch-user")
    .description("Fetch user data")
    .outputs(["user_id", "user_name"])  # Outputs these values
    .next_nodes(["process"])
    .build()
)
 
# Node 2: Consumes data
consumer = (
    NodeBuilder("process")
    .description("Process user data")
    .inputs(["user_id", "user_name"])  # Uses these inputs
    .outputs(["result"])
    .build()
)

Notes:

  • Input/output names are descriptive
  • Platform handles actual data passing
  • SDK just defines the structure

Multiple Workflows

Agents can have multiple workflows for different tasks.

from ainalyn import AgentBuilder
 
# Workflow 1: Data processing
processing_workflow = WorkflowBuilder("process-data").build()
 
# Workflow 2: Reporting
reporting_workflow = WorkflowBuilder("generate-report").build()
 
# Agent with both workflows
agent = (
    AgentBuilder("data-agent")
    .version("1.0.0")
    .add_workflow(processing_workflow)
    .add_workflow(reporting_workflow)
    .build()
)

Each workflow is independent and can be invoked separately by the platform.

Node Types

Uses Prompt - Uses a prompt template

NodeBuilder("analyze")
    .description("Analyze data")
    .uses_prompt("analyzer-prompt")  # References a Prompt

Uses Module - Executes custom logic

NodeBuilder("fetch")
    .description("Fetch data")
    .uses_module("http-client")  # References a Module

Uses Tool - Calls an external tool

NodeBuilder("search")
    .description("Search web")
    .uses_tool("web-search")  # References a Tool

Note: The node type is automatically determined by which .uses_*() method you call.

Common Patterns

Sequential Processing

# Step 1 → Step 2 → Step 3
workflow = (
    WorkflowBuilder("sequential")
    .description("Sequential workflow")
    .add_node(
        NodeBuilder("step1")
        .description("First")
        .uses_prompt("prompt1")
        .next_nodes("step2")
        .build()
    )
    .add_node(
        NodeBuilder("step2")
        .description("Second")
        .uses_prompt("prompt2")
        .next_nodes("step3")
        .build()
    )
    .add_node(
        NodeBuilder("step3")
        .description("Final")
        .uses_prompt("prompt3")
        .build()
    )
    .entry_node("step1")
    .build()
)

Fan-Out, Fan-In

# One node → Multiple parallel nodes → Merge
workflow = (
    WorkflowBuilder("parallel")
    .description("Parallel workflow")
    .add_node(
        NodeBuilder("split")
        .description("Split work")
        .uses_prompt("split-prompt")
        .next_nodes("work1", "work2", "work3")
        .build()
    )
    .add_node(
        NodeBuilder("work1")
        .description("Work 1")
        .uses_prompt("work-prompt-1")
        .next_nodes("merge")
        .build()
    )
    .add_node(
        NodeBuilder("work2")
        .description("Work 2")
        .uses_prompt("work-prompt-2")
        .next_nodes("merge")
        .build()
    )
    .add_node(
        NodeBuilder("work3")
        .description("Work 3")
        .uses_prompt("work-prompt-3")
        .next_nodes("merge")
        .build()
    )
    .add_node(
        NodeBuilder("merge")
        .description("Combine results")
        .uses_prompt("merge-prompt")
        .build()
    )
    .entry_node("split")
    .build()
)

Pipeline Processing

# Input → Transform → Filter → Output
workflow = (
    WorkflowBuilder("pipeline")
    .description("Pipeline workflow")
    .add_node(
        NodeBuilder("input")
        .description("Read input")
        .uses_module("input-reader")
        .next_nodes("transform")
        .outputs("raw_data")
        .build()
    )
    .add_node(
        NodeBuilder("transform")
        .description("Transform data")
        .uses_prompt("transform-prompt")
        .inputs("raw_data")
        .next_nodes("filter")
        .outputs("transformed_data")
        .build()
    )
    .add_node(
        NodeBuilder("filter")
        .description("Filter data")
        .uses_prompt("filter-prompt")
        .inputs("transformed_data")
        .next_nodes("output")
        .outputs("filtered_data")
        .build()
    )
    .add_node(
        NodeBuilder("output")
        .description("Write output")
        .uses_module("output-writer")
        .inputs("filtered_data")
        .build()
    )
    .entry_node("input")
    .build()
)

Validation Rules

Valid workflows:

  • At least one node
  • Entry node exists in nodes
  • All next_nodes references exist
  • No circular dependencies (A → B → A)

Invalid workflows:

# No nodes
workflow = WorkflowBuilder("empty").build()
 
# Entry node doesn't exist
workflow = (
    WorkflowBuilder("bad")
    .entry_node("missing")  # No node named "missing"
    .add_node(NodeBuilder("actual").build())
    .build()
)
 
# Undefined next_node reference
node = (
    NodeBuilder("broken")
    .next_nodes(["nonexistent"])  # No node named "nonexistent"
    .build()
)

Best Practices

1. Use descriptive names

# Clear names
NodeBuilder("fetch-user-data")
NodeBuilder("validate-email")
NodeBuilder("send-notification")
 
# Unclear names
NodeBuilder("step1")
NodeBuilder("process")
NodeBuilder("do-stuff")

2. Define clear data flow

# Explicit inputs/outputs
NodeBuilder("transform")
    .inputs(["raw_data"])
    .outputs(["clean_data"])
 
# No data flow specified
NodeBuilder("transform")
    # Where does data come from?

3. Keep workflows focused

# Single responsibility
process_workflow = WorkflowBuilder("process-orders")
report_workflow = WorkflowBuilder("generate-reports")
 
# Too many responsibilities
everything_workflow = WorkflowBuilder("do-everything")

See Also