Workflows
How to structure task flows with nodes and dependencies.
What is a Workflow?
A workflow defines the execution flow of tasks in your agent. It’s a directed graph where:
- Nodes are processing steps
- next_nodes define execution order
- entry_node is where execution starts
Basic Workflow
from ainalyn import WorkflowBuilder, NodeBuilder
workflow = (
WorkflowBuilder("process-data")
.description("Fetch and process data")
.add_node(
NodeBuilder("fetch")
.description("Fetch raw data")
.uses_module("http-client")
.outputs("raw_data")
.next_nodes("process")
.build()
)
.add_node(
NodeBuilder("process")
.description("Process the data")
.uses_prompt("analyzer")
.inputs("raw_data")
.build()
)
.entry_node("fetch")
.build()
)Execution flow: fetch → process
Node Dependencies
Dependencies define execution order through next_nodes.
Linear flow:
# A → B → C
node_a = (
NodeBuilder("step-a")
.description("First step")
.uses_prompt("prompt-a")
.next_nodes("step-b")
.build()
)
node_b = (
NodeBuilder("step-b")
.description("Second step")
.uses_prompt("prompt-b")
.next_nodes("step-c")
.build()
)
node_c = (
NodeBuilder("step-c")
.description("Final step")
.uses_prompt("prompt-c")
.build()
)Parallel execution:
# A → [B, C] → D
node_a = (
NodeBuilder("start")
.description("Start processing")
.uses_prompt("start-prompt")
.next_nodes("process-1", "process-2") # Both run in parallel
.build()
)
node_b = (
NodeBuilder("process-1")
.description("First parallel task")
.uses_prompt("process-prompt-1")
.next_nodes("merge")
.build()
)
node_c = (
NodeBuilder("process-2")
.description("Second parallel task")
.uses_prompt("process-prompt-2")
.next_nodes("merge")
.build()
)
node_d = (
NodeBuilder("merge")
.description("Merge results")
.uses_prompt("merge-prompt")
.build()
)Conditional branching:
# A → [B or C] (decided by platform)
node_a = (
NodeBuilder("analyze")
.description("Analyze input")
.uses_module("analyzer")
.next_nodes("path-a", "path-b") # Platform decides which path
.build()
)Entry Node
Every workflow must specify where execution starts.
workflow = (
WorkflowBuilder("my-workflow")
.entry_node("start") # Execution begins here
.add_node(
NodeBuilder("start")
.description("Starting point")
.build()
)
.build()
)Rules:
- Entry node must exist in the workflow’s nodes
- Only one entry node per workflow
- Entry node name must match exactly
Data Flow
Nodes pass data using inputs and outputs.
# Node 1: Produces data
producer = (
NodeBuilder("fetch-user")
.description("Fetch user data")
.outputs(["user_id", "user_name"]) # Outputs these values
.next_nodes(["process"])
.build()
)
# Node 2: Consumes data
consumer = (
NodeBuilder("process")
.description("Process user data")
.inputs(["user_id", "user_name"]) # Uses these inputs
.outputs(["result"])
.build()
)Notes:
- Input/output names are descriptive
- Platform handles actual data passing
- SDK just defines the structure
Multiple Workflows
Agents can have multiple workflows for different tasks.
from ainalyn import AgentBuilder
# Workflow 1: Data processing
processing_workflow = WorkflowBuilder("process-data").build()
# Workflow 2: Reporting
reporting_workflow = WorkflowBuilder("generate-report").build()
# Agent with both workflows
agent = (
AgentBuilder("data-agent")
.version("1.0.0")
.add_workflow(processing_workflow)
.add_workflow(reporting_workflow)
.build()
)Each workflow is independent and can be invoked separately by the platform.
Node Types
Uses Prompt - Uses a prompt template
NodeBuilder("analyze")
.description("Analyze data")
.uses_prompt("analyzer-prompt") # References a PromptUses Module - Executes custom logic
NodeBuilder("fetch")
.description("Fetch data")
.uses_module("http-client") # References a ModuleUses Tool - Calls an external tool
NodeBuilder("search")
.description("Search web")
.uses_tool("web-search") # References a ToolNote: The node type is automatically determined by which .uses_*() method you call.
Common Patterns
Sequential Processing
# Step 1 → Step 2 → Step 3
workflow = (
WorkflowBuilder("sequential")
.description("Sequential workflow")
.add_node(
NodeBuilder("step1")
.description("First")
.uses_prompt("prompt1")
.next_nodes("step2")
.build()
)
.add_node(
NodeBuilder("step2")
.description("Second")
.uses_prompt("prompt2")
.next_nodes("step3")
.build()
)
.add_node(
NodeBuilder("step3")
.description("Final")
.uses_prompt("prompt3")
.build()
)
.entry_node("step1")
.build()
)Fan-Out, Fan-In
# One node → Multiple parallel nodes → Merge
workflow = (
WorkflowBuilder("parallel")
.description("Parallel workflow")
.add_node(
NodeBuilder("split")
.description("Split work")
.uses_prompt("split-prompt")
.next_nodes("work1", "work2", "work3")
.build()
)
.add_node(
NodeBuilder("work1")
.description("Work 1")
.uses_prompt("work-prompt-1")
.next_nodes("merge")
.build()
)
.add_node(
NodeBuilder("work2")
.description("Work 2")
.uses_prompt("work-prompt-2")
.next_nodes("merge")
.build()
)
.add_node(
NodeBuilder("work3")
.description("Work 3")
.uses_prompt("work-prompt-3")
.next_nodes("merge")
.build()
)
.add_node(
NodeBuilder("merge")
.description("Combine results")
.uses_prompt("merge-prompt")
.build()
)
.entry_node("split")
.build()
)Pipeline Processing
# Input → Transform → Filter → Output
workflow = (
WorkflowBuilder("pipeline")
.description("Pipeline workflow")
.add_node(
NodeBuilder("input")
.description("Read input")
.uses_module("input-reader")
.next_nodes("transform")
.outputs("raw_data")
.build()
)
.add_node(
NodeBuilder("transform")
.description("Transform data")
.uses_prompt("transform-prompt")
.inputs("raw_data")
.next_nodes("filter")
.outputs("transformed_data")
.build()
)
.add_node(
NodeBuilder("filter")
.description("Filter data")
.uses_prompt("filter-prompt")
.inputs("transformed_data")
.next_nodes("output")
.outputs("filtered_data")
.build()
)
.add_node(
NodeBuilder("output")
.description("Write output")
.uses_module("output-writer")
.inputs("filtered_data")
.build()
)
.entry_node("input")
.build()
)Validation Rules
Valid workflows:
- At least one node
- Entry node exists in nodes
- All next_nodes references exist
- No circular dependencies (A → B → A)
Invalid workflows:
# No nodes
workflow = WorkflowBuilder("empty").build()
# Entry node doesn't exist
workflow = (
WorkflowBuilder("bad")
.entry_node("missing") # No node named "missing"
.add_node(NodeBuilder("actual").build())
.build()
)
# Undefined next_node reference
node = (
NodeBuilder("broken")
.next_nodes(["nonexistent"]) # No node named "nonexistent"
.build()
)Best Practices
1. Use descriptive names
# Clear names
NodeBuilder("fetch-user-data")
NodeBuilder("validate-email")
NodeBuilder("send-notification")
# Unclear names
NodeBuilder("step1")
NodeBuilder("process")
NodeBuilder("do-stuff")2. Define clear data flow
# Explicit inputs/outputs
NodeBuilder("transform")
.inputs(["raw_data"])
.outputs(["clean_data"])
# No data flow specified
NodeBuilder("transform")
# Where does data come from?3. Keep workflows focused
# Single responsibility
process_workflow = WorkflowBuilder("process-orders")
report_workflow = WorkflowBuilder("generate-reports")
# Too many responsibilities
everything_workflow = WorkflowBuilder("do-everything")See Also
- Your First Agent - Complete workflow example
- NodeBuilder API - All node options
- Validation - Workflow validation rules