> ## Documentation Index
> Fetch the complete documentation index at: https://wavefront.rootflo.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Arium Workflows

> Create complex multi-agent workflows with Arium orchestration

## What is Arium?

Arium is Flo AI's powerful workflow orchestration engine for creating complex multi-agent workflows. It allows you to chain agents together, implement conditional routing, and build sophisticated AI systems.

### AriumBuilder Methods

The `AriumBuilder` class provides a fluent interface for configuring workflows. All methods return `self` for method chaining. Here's a complete reference:

| Method                                                                                | Description                             | Parameters                                                                                        |
| ------------------------------------------------------------------------------------- | --------------------------------------- | ------------------------------------------------------------------------------------------------- |
| `with_memory(memory: MessageMemory)`                                                  | Set shared memory for the workflow      | `memory`: MessageMemory instance                                                                  |
| `add_agent(agent: Agent)`                                                             | Add a single agent to the workflow      | `agent`: Agent instance                                                                           |
| `add_agents(agents: List[Agent])`                                                     | Add multiple agents to the workflow     | `agents`: List of Agent instances                                                                 |
| `add_function_node(node: FunctionNode)`                                               | Add a function node to the workflow     | `node`: FunctionNode instance                                                                     |
| `add_function_nodes(nodes: List[FunctionNode])`                                       | Add multiple function nodes             | `nodes`: List of FunctionNode instances                                                           |
| `add_arium(arium: Arium, name: str, inherit_variables: bool)`                         | Add a nested Arium workflow as a node   | `arium`: Arium instance, `name`: Optional name, `inherit_variables`: Whether to inherit variables |
| `add_foreach(name: str, execute_node: AriumNodeType)`                                 | Add a ForEach node for batch processing | `name`: Node name, `execute_node`: Node to execute on each item                                   |
| `start_with(node: AriumNodeType \| str)`                                              | Set the starting node                   | `node`: Agent, FunctionNode, AriumNode, or node name string                                       |
| `end_with(node: AriumNodeType)`                                                       | Add an ending node                      | `node`: Agent, FunctionNode, or AriumNode                                                         |
| `connect(from_node: AriumNodeType, to_node: AriumNodeType)`                           | Connect two nodes directly              | `from_node`: Source node, `to_node`: Target node                                                  |
| `add_edge(from_node: AriumNodeType, to_nodes: List[AriumNodeType], router: Callable)` | Add edge with optional router function  | `from_node`: Source node, `to_nodes`: List of target nodes, `router`: Optional routing function   |
| `from_yaml(yaml_str: str, yaml_file: str, ...)`                                       | Create builder from YAML configuration  | `yaml_str`: YAML string, `yaml_file`: Path to YAML file, plus optional registries                 |
| `build()`                                                                             | Build and return the Arium instance     | Returns: `Arium` instance                                                                         |
| `build_and_run(inputs, variables: Dict)`                                              | Build and run the workflow              | `inputs`: List of messages or string, `variables`: Optional runtime variables                     |
| `visualize(output_path: str, title: str)`                                             | Generate workflow visualization         | `output_path`: Path for graph image, `title`: Graph title                                         |
| `reset()`                                                                             | Reset builder to start fresh            | Returns: `AriumBuilder` instance                                                                  |

**Note:** `start_with()` and `end_with()` are required before calling `build()`. All other methods are optional.

## Node Types

Arium workflows support several types of nodes, each serving different purposes:

### Agent Nodes

Agents are the primary executable nodes in Arium workflows. They use LLMs to process inputs and generate responses.

```python theme={null}
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

agent = (
    AgentBuilder()
    .with_name('analyzer')
    .with_prompt('Analyze the input content.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .build()
)

# Use agent in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .start_with(agent)
    .end_with(agent)
)
```

### Function Nodes

Function nodes allow you to execute custom Python functions within workflows. They can be synchronous or asynchronous.

```python theme={null}
from flo_ai.arium.nodes import FunctionNode

# Synchronous function
def process_data(inputs, variables=None, **kwargs):
    # Process inputs
    result = f"Processed: {inputs}"
    return result

# Asynchronous function
async def async_process(inputs, variables=None, **kwargs):
    # Async processing
    await asyncio.sleep(0.1)
    return f"Async processed: {inputs}"

# Create function nodes
sync_node = FunctionNode(
    name='data_processor',
    description='Processes input data',
    function=process_data
)

async_node = FunctionNode(
    name='async_processor',
    description='Asynchronously processes data',
    function=async_process
)

# Use in workflow
workflow = (
    AriumBuilder()
    .add_function_node(sync_node)
    .add_function_node(async_node)
    .start_with(sync_node)
    .connect(sync_node, async_node)
    .end_with(async_node)
)
```

### Arium Nodes (Nested Workflows)

Arium nodes allow you to embed one workflow inside another, creating hierarchical workflows with isolated memory.

```python theme={null}
# Create a sub-workflow
sub_workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

# Embed as a node in parent workflow
from flo_ai.arium.nodes import AriumNode

nested_node = AriumNode(
    name='sub_workflow',
    arium=sub_workflow,
    inherit_variables=True  # Pass parent variables to sub-workflow
)

# Use in parent workflow
parent_workflow = (
    AriumBuilder()
    .add_agent(main_agent)
    .add_arium(sub_workflow, name='sub_workflow', inherit_variables=True)
    .start_with(main_agent)
    .connect(main_agent, nested_node)
    .end_with(nested_node)
)
```

### ForEach Nodes

ForEach nodes execute a node on each item in a collection, useful for batch processing.

```python theme={null}
from flo_ai.arium.nodes import ForEachNode

# Create a ForEach node that processes each item with an agent
foreach_node = ForEachNode(
    name='batch_processor',
    execute_node=agent  # Agent to execute on each item
)

# Use in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .add_foreach('batch_processor', agent)
    .start_with(foreach_node)
    .end_with(foreach_node)
)

# When run with multiple inputs, each input is processed sequentially
result = await workflow.run([
    "Process item 1",
    "Process item 2",
    "Process item 3"
])
```

### Combining Node Types

You can combine different node types in a single workflow:

```python theme={null}
# Create different node types
agent = AgentBuilder().with_name('agent').with_prompt('...').with_llm(llm).build()
function_node = FunctionNode(name='processor', description='...', function=process_func)
sub_arium = AriumBuilder().add_agents([...]).build()
nested_node = AriumNode(name='nested', arium=sub_arium)
foreach_node = ForEachNode(name='batch', execute_node=agent)

# Combine in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .add_function_node(function_node)
    .add_arium(sub_arium, name='nested')
    .add_foreach('batch', agent)
    .start_with(agent)
    .connect(agent, function_node)
    .connect(function_node, nested_node)
    .connect(nested_node, foreach_node)
    .end_with(foreach_node)
)
```

## Basic Workflow Creation

### Simple Agent Chain

Create a linear workflow with multiple agents:

```python theme={null}
from flo_ai.arium import AriumBuilder
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

async def simple_chain():
    llm = OpenAI(model='gpt-4o-mini')

    # Create agents using AgentBuilder
    analyst = (
        AgentBuilder()
        .with_name('content_analyst')
        .with_prompt('Analyze the input and extract key insights.')
        .with_llm(llm)
        .build()
    )

    summarizer = (
        AgentBuilder()
        .with_name('summarizer')
        .with_prompt('Create a concise summary based on the analysis.')
        .with_llm(llm)
        .build()
    )

    # Build and run workflow
    result = await (
        AriumBuilder()
        .add_agents([analyst, summarizer])
        .start_with(analyst)
        .connect(analyst, summarizer)
        .end_with(summarizer)
        .build_and_run(["Analyze this complex business report..."])
    )

    return result
```

### Conditional Routing

Route to different agents based on conditions:

```python theme={null}
from flo_ai.arium.memory import MessageMemory, MessageMemoryItem
from typing import List

def route_by_type(memory: MessageMemory) -> str:
    """Route based on classification result"""
    messages: List[MessageMemoryItem] = memory.get()
    
    if not messages:
        return "business_specialist"  # Default route
    
    # Access the last message result
    last_message_item = messages[-1]
    last_message_content = str(last_message_item.result)
    
    if "technical" in last_message_content.lower():
        return "tech_specialist"
    else:
        return "business_specialist"

# Build workflow with conditional routing
result = await (
    AriumBuilder()
    .add_agents([classifier, tech_specialist, business_specialist, final_agent])
    .start_with(classifier)
    .add_edge(classifier, [tech_specialist, business_specialist], route_by_type)
    .connect(tech_specialist, final_agent)
    .connect(business_specialist, final_agent)
    .end_with(final_agent)
    .build_and_run(["How can we optimize our database performance?"])
)
```

## YAML-Based Workflows

Define entire workflows in YAML for easy management:

```yaml theme={null}
metadata:
  name: "content-analysis-workflow"
  version: "1.0.0"
  description: "Multi-agent content analysis pipeline"

arium:
  agents:
    - name: "analyzer"
      role: "Content Analyst"
      job: "Analyze the input content and extract key insights."
      model:
        provider: "openai"
        name: "gpt-4o-mini"

    - name: "summarizer"
      role: "Content Summarizer"
      job: "Create a concise summary based on the analysis."
      model:
        provider: "anthropic"
        name: "claude-3-5-sonnet-20240620"

  workflow:
    start: "analyzer"
    edges:
      - from: "analyzer"
        to: ["summarizer"]
    end: ["summarizer"]
```

```python theme={null}
# Run YAML workflow
result = await (
    AriumBuilder()
    .from_yaml(yaml_file='workflow.yaml')
    .build_and_run(["Analyze this quarterly business report..."])
)
```

## Advanced Routing

### LLM-Powered Routers

Use LLMs for intelligent routing decisions:

```yaml theme={null}
routers:
  - name: "content_type_router"
    type: "smart" # Uses LLM for intelligent routing
    routing_options:
      technical_writer: "Technical content, documentation, tutorials"
      creative_writer: "Creative writing, storytelling, fiction"
      marketing_writer: "Marketing copy, sales content, campaigns"
    model:
      provider: "openai"
      name: "gpt-4o-mini"
```

### ReflectionRouter

For A→B→A→C feedback patterns:

```yaml theme={null}
routers:
  - name: "reflection_router"
    type: "reflection"
    flow_pattern: [writer, critic, writer] # A → B → A pattern
    model:
      provider: "openai"
      name: "gpt-4o-mini"
```

### PlanExecuteRouter

For Cursor-style plan-and-execute workflows:

```yaml theme={null}
routers:
  - name: "plan_router"
    type: "plan_execute"
  agents:
    planner: "Creates detailed execution plans"
    developer: "Implements features according to plan"
    tester: "Tests implementations and validates functionality"
    reviewer: "Reviews and approves completed work"
  settings:
    planner_agent: planner
    executor_agent: developer
    reviewer_agent: reviewer
```

## Workflow Patterns

### Sequential Processing

```python theme={null}
# A → B → C
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .connect(agent_b, agent_c)
    .end_with(agent_c)
)
```

### Parallel Processing

```python theme={null}
# A → [B, C] → D
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c])
    .connect(agent_b, agent_d)
    .connect(agent_c, agent_d)
    .end_with(agent_d)
)
```

### Fan-out/Fan-in

```python theme={null}
# A → [B, C, D] → E
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d, agent_e])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c, agent_d])
    .connect(agent_b, agent_e)
    .connect(agent_c, agent_e)
    .connect(agent_d, agent_e)
    .end_with(agent_e)
)
```

## Memory Management

### Shared Memory

```python theme={null}
from flo_ai.arium.memory import MessageMemory

# Create shared memory
shared_memory = MessageMemory()

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_memory(shared_memory)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)
```

### Custom Memory

You can extend `MessageMemory` to add custom functionality:

```python theme={null}
from flo_ai.arium.memory import MessageMemory, MessageMemoryItem
from typing import List, Optional

class CustomMemory(MessageMemory):
    def __init__(self):
        super().__init__()
        # Add any custom attributes here
        self.custom_data = {}

    def get_filtered_by_custom_logic(self, filter_func) -> List[MessageMemoryItem]:
        """Custom method to filter messages"""
        return [msg for msg in self.messages if filter_func(msg)]

# Use custom memory in workflow
custom_memory = CustomMemory()
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_memory(custom_memory)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)
```

## Workflow Execution

### Running Workflows

Workflows can be executed using `build_and_run()` or by building first and then running:

```python theme={null}
# Build and run in one step
result = await (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build_and_run(["Process this input"])
)

# Or build first, then run
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

result = await workflow.run(["Process this input"])

# With variables
result = await workflow.run(
    ["Process this input"],
    variables={'user_id': '123', 'context': 'production'}
)
```

## Error Handling

Error handling in Arium workflows is managed at the agent level. Configure retries and error handling when building agents:

```python theme={null}
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

# Create agents with retry configuration
agent_a = (
    AgentBuilder()
    .with_name('agent_a')
    .with_prompt('Process the input.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .with_retries(3)  # Retry up to 3 times on failure
    .build()
)

agent_b = (
    AgentBuilder()
    .with_name('agent_b')
    .with_prompt('Continue processing.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .with_retries(2)  # Retry up to 2 times on failure
    .build()
)

# Build workflow with error-resilient agents
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)
```

## Performance Optimization

### Parallel Execution

Arium automatically executes agents in parallel when multiple agents are connected from the same source node:

```python theme={null}
# Agents B and C will execute in parallel after A completes
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d])
    .start_with(agent_a)
    .add_edge(agent_a, [agent_b, agent_c])  # Parallel execution
    .connect(agent_b, agent_d)
    .connect(agent_c, agent_d)
    .end_with(agent_d)
)
```

### Workflow Visualization

Visualize your workflow to understand the execution flow:

```python theme={null}
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .connect(agent_b, agent_c)
    .end_with(agent_c)
    .visualize(output_path='workflow.png', title='My Workflow')
    .build()
)
```

## Best Practices

### Workflow Design

1. **Keep it simple**: Start with linear workflows before adding complexity
2. **Use meaningful names**: Name agents and workflows descriptively
3. **Handle errors**: Always implement error handling and recovery
4. **Test thoroughly**: Test workflows with various inputs

### Performance Tips

1. **Use appropriate models**: Choose models based on task complexity
2. **Implement caching**: Cache expensive operations
3. **Optimize routing**: Use efficient routing logic
4. **Monitor performance**: Use telemetry to track workflow performance

### Debugging

Enable debug logging at the Python level to troubleshoot workflows:

```python theme={null}
import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# Build and run workflow - debug logs will show execution flow
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

result = await workflow.run(["Test input"])
```
