Skip to main content

What is Arium?

Arium is Flo AI’s powerful workflow orchestration engine for creating complex multi-agent workflows. It allows you to chain agents together, implement conditional routing, and build sophisticated AI systems.

AriumBuilder Methods

The AriumBuilder class provides a fluent interface for configuring workflows. All methods return self for method chaining. Here’s a complete reference:
MethodDescriptionParameters
with_memory(memory: MessageMemory)Set shared memory for the workflowmemory: MessageMemory instance
add_agent(agent: Agent)Add a single agent to the workflowagent: Agent instance
add_agents(agents: List[Agent])Add multiple agents to the workflowagents: List of Agent instances
add_function_node(node: FunctionNode)Add a function node to the workflownode: FunctionNode instance
add_function_nodes(nodes: List[FunctionNode])Add multiple function nodesnodes: List of FunctionNode instances
add_arium(arium: Arium, name: str, inherit_variables: bool)Add a nested Arium workflow as a nodearium: Arium instance, name: Optional name, inherit_variables: Whether to inherit variables
add_foreach(name: str, execute_node: AriumNodeType)Add a ForEach node for batch processingname: Node name, execute_node: Node to execute on each item
start_with(node: AriumNodeType | str)Set the starting nodenode: Agent, FunctionNode, AriumNode, or node name string
end_with(node: AriumNodeType)Add an ending nodenode: Agent, FunctionNode, or AriumNode
connect(from_node: AriumNodeType, to_node: AriumNodeType)Connect two nodes directlyfrom_node: Source node, to_node: Target node
add_edge(from_node: AriumNodeType, to_nodes: List[AriumNodeType], router: Callable)Add edge with optional router functionfrom_node: Source node, to_nodes: List of target nodes, router: Optional routing function
from_yaml(yaml_str: str, yaml_file: str, ...)Create builder from YAML configurationyaml_str: YAML string, yaml_file: Path to YAML file, plus optional registries
build()Build and return the Arium instanceReturns: Arium instance
build_and_run(inputs, variables: Dict)Build and run the workflowinputs: List of messages or string, variables: Optional runtime variables
visualize(output_path: str, title: str)Generate workflow visualizationoutput_path: Path for graph image, title: Graph title
reset()Reset builder to start freshReturns: AriumBuilder instance
Note: start_with() and end_with() are required before calling build(). All other methods are optional.

Node Types

Arium workflows support several types of nodes, each serving different purposes:

Agent Nodes

Agents are the primary executable nodes in Arium workflows. They use LLMs to process inputs and generate responses.
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

agent = (
    AgentBuilder()
    .with_name('analyzer')
    .with_prompt('Analyze the input content.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .build()
)

# Use agent in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .start_with(agent)
    .end_with(agent)
)

Function Nodes

Function nodes allow you to execute custom Python functions within workflows. They can be synchronous or asynchronous.
from flo_ai.arium.nodes import FunctionNode

# Synchronous function
def process_data(inputs, variables=None, **kwargs):
    # Process inputs
    result = f"Processed: {inputs}"
    return result

# Asynchronous function
async def async_process(inputs, variables=None, **kwargs):
    # Async processing
    await asyncio.sleep(0.1)
    return f"Async processed: {inputs}"

# Create function nodes
sync_node = FunctionNode(
    name='data_processor',
    description='Processes input data',
    function=process_data
)

async_node = FunctionNode(
    name='async_processor',
    description='Asynchronously processes data',
    function=async_process
)

# Use in workflow
workflow = (
    AriumBuilder()
    .add_function_node(sync_node)
    .add_function_node(async_node)
    .start_with(sync_node)
    .connect(sync_node, async_node)
    .end_with(async_node)
)

Arium Nodes (Nested Workflows)

Arium nodes allow you to embed one workflow inside another, creating hierarchical workflows with isolated memory.
# Create a sub-workflow
sub_workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

# Embed as a node in parent workflow
from flo_ai.arium.nodes import AriumNode

nested_node = AriumNode(
    name='sub_workflow',
    arium=sub_workflow,
    inherit_variables=True  # Pass parent variables to sub-workflow
)

# Use in parent workflow
parent_workflow = (
    AriumBuilder()
    .add_agent(main_agent)
    .add_arium(sub_workflow, name='sub_workflow', inherit_variables=True)
    .start_with(main_agent)
    .connect(main_agent, nested_node)
    .end_with(nested_node)
)

ForEach Nodes

ForEach nodes execute a node on each item in a collection, useful for batch processing.
from flo_ai.arium.nodes import ForEachNode

# Create a ForEach node that processes each item with an agent
foreach_node = ForEachNode(
    name='batch_processor',
    execute_node=agent  # Agent to execute on each item
)

# Use in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .add_foreach('batch_processor', agent)
    .start_with(foreach_node)
    .end_with(foreach_node)
)

# When run with multiple inputs, each input is processed sequentially
result = await workflow.run([
    "Process item 1",
    "Process item 2",
    "Process item 3"
])

Combining Node Types

You can combine different node types in a single workflow:
# Create different node types
agent = AgentBuilder().with_name('agent').with_prompt('...').with_llm(llm).build()
function_node = FunctionNode(name='processor', description='...', function=process_func)
sub_arium = AriumBuilder().add_agents([...]).build()
nested_node = AriumNode(name='nested', arium=sub_arium)
foreach_node = ForEachNode(name='batch', execute_node=agent)

# Combine in workflow
workflow = (
    AriumBuilder()
    .add_agent(agent)
    .add_function_node(function_node)
    .add_arium(sub_arium, name='nested')
    .add_foreach('batch', agent)
    .start_with(agent)
    .connect(agent, function_node)
    .connect(function_node, nested_node)
    .connect(nested_node, foreach_node)
    .end_with(foreach_node)
)

Basic Workflow Creation

Simple Agent Chain

Create a linear workflow with multiple agents:
from flo_ai.arium import AriumBuilder
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

async def simple_chain():
    llm = OpenAI(model='gpt-4o-mini')

    # Create agents using AgentBuilder
    analyst = (
        AgentBuilder()
        .with_name('content_analyst')
        .with_prompt('Analyze the input and extract key insights.')
        .with_llm(llm)
        .build()
    )

    summarizer = (
        AgentBuilder()
        .with_name('summarizer')
        .with_prompt('Create a concise summary based on the analysis.')
        .with_llm(llm)
        .build()
    )

    # Build and run workflow
    result = await (
        AriumBuilder()
        .add_agents([analyst, summarizer])
        .start_with(analyst)
        .connect(analyst, summarizer)
        .end_with(summarizer)
        .build_and_run(["Analyze this complex business report..."])
    )

    return result

Conditional Routing

Route to different agents based on conditions:
from flo_ai.arium.memory import MessageMemory, MessageMemoryItem
from typing import List

def route_by_type(memory: MessageMemory) -> str:
    """Route based on classification result"""
    messages: List[MessageMemoryItem] = memory.get()
    
    if not messages:
        return "business_specialist"  # Default route
    
    # Access the last message result
    last_message_item = messages[-1]
    last_message_content = str(last_message_item.result)
    
    if "technical" in last_message_content.lower():
        return "tech_specialist"
    else:
        return "business_specialist"

# Build workflow with conditional routing
result = await (
    AriumBuilder()
    .add_agents([classifier, tech_specialist, business_specialist, final_agent])
    .start_with(classifier)
    .add_edge(classifier, [tech_specialist, business_specialist], route_by_type)
    .connect(tech_specialist, final_agent)
    .connect(business_specialist, final_agent)
    .end_with(final_agent)
    .build_and_run(["How can we optimize our database performance?"])
)

YAML-Based Workflows

Define entire workflows in YAML for easy management:
metadata:
  name: "content-analysis-workflow"
  version: "1.0.0"
  description: "Multi-agent content analysis pipeline"

arium:
  agents:
    - name: "analyzer"
      role: "Content Analyst"
      job: "Analyze the input content and extract key insights."
      model:
        provider: "openai"
        name: "gpt-4o-mini"

    - name: "summarizer"
      role: "Content Summarizer"
      job: "Create a concise summary based on the analysis."
      model:
        provider: "anthropic"
        name: "claude-3-5-sonnet-20240620"

  workflow:
    start: "analyzer"
    edges:
      - from: "analyzer"
        to: ["summarizer"]
    end: ["summarizer"]
# Run YAML workflow
result = await (
    AriumBuilder()
    .from_yaml(yaml_file='workflow.yaml')
    .build_and_run(["Analyze this quarterly business report..."])
)

Advanced Routing

LLM-Powered Routers

Use LLMs for intelligent routing decisions:
routers:
  - name: "content_type_router"
    type: "smart" # Uses LLM for intelligent routing
    routing_options:
      technical_writer: "Technical content, documentation, tutorials"
      creative_writer: "Creative writing, storytelling, fiction"
      marketing_writer: "Marketing copy, sales content, campaigns"
    model:
      provider: "openai"
      name: "gpt-4o-mini"

ReflectionRouter

For A→B→A→C feedback patterns:
routers:
  - name: "reflection_router"
    type: "reflection"
    flow_pattern: [writer, critic, writer] # A → B → A pattern
    model:
      provider: "openai"
      name: "gpt-4o-mini"

PlanExecuteRouter

For Cursor-style plan-and-execute workflows:
routers:
  - name: "plan_router"
    type: "plan_execute"
  agents:
    planner: "Creates detailed execution plans"
    developer: "Implements features according to plan"
    tester: "Tests implementations and validates functionality"
    reviewer: "Reviews and approves completed work"
  settings:
    planner_agent: planner
    executor_agent: developer
    reviewer_agent: reviewer

Workflow Patterns

Sequential Processing

# A → B → C
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .connect(agent_b, agent_c)
    .end_with(agent_c)
)

Parallel Processing

# A → [B, C] → D
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c])
    .connect(agent_b, agent_d)
    .connect(agent_c, agent_d)
    .end_with(agent_d)
)

Fan-out/Fan-in

# A → [B, C, D] → E
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d, agent_e])
    .start_with(agent_a)
    .connect(agent_a, [agent_b, agent_c, agent_d])
    .connect(agent_b, agent_e)
    .connect(agent_c, agent_e)
    .connect(agent_d, agent_e)
    .end_with(agent_e)
)

Memory Management

Shared Memory

from flo_ai.arium.memory import MessageMemory

# Create shared memory
shared_memory = MessageMemory()

workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_memory(shared_memory)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Custom Memory

You can extend MessageMemory to add custom functionality:
from flo_ai.arium.memory import MessageMemory, MessageMemoryItem
from typing import List, Optional

class CustomMemory(MessageMemory):
    def __init__(self):
        super().__init__()
        # Add any custom attributes here
        self.custom_data = {}

    def get_filtered_by_custom_logic(self, filter_func) -> List[MessageMemoryItem]:
        """Custom method to filter messages"""
        return [msg for msg in self.messages if filter_func(msg)]

# Use custom memory in workflow
custom_memory = CustomMemory()
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .with_memory(custom_memory)
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
)

Workflow Execution

Running Workflows

Workflows can be executed using build_and_run() or by building first and then running:
# Build and run in one step
result = await (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build_and_run(["Process this input"])
)

# Or build first, then run
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

result = await workflow.run(["Process this input"])

# With variables
result = await workflow.run(
    ["Process this input"],
    variables={'user_id': '123', 'context': 'production'}
)

Error Handling

Error handling in Arium workflows is managed at the agent level. Configure retries and error handling when building agents:
from flo_ai.agent import AgentBuilder
from flo_ai.llm import OpenAI

# Create agents with retry configuration
agent_a = (
    AgentBuilder()
    .with_name('agent_a')
    .with_prompt('Process the input.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .with_retries(3)  # Retry up to 3 times on failure
    .build()
)

agent_b = (
    AgentBuilder()
    .with_name('agent_b')
    .with_prompt('Continue processing.')
    .with_llm(OpenAI(model='gpt-4o-mini'))
    .with_retries(2)  # Retry up to 2 times on failure
    .build()
)

# Build workflow with error-resilient agents
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

Performance Optimization

Parallel Execution

Arium automatically executes agents in parallel when multiple agents are connected from the same source node:
# Agents B and C will execute in parallel after A completes
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c, agent_d])
    .start_with(agent_a)
    .add_edge(agent_a, [agent_b, agent_c])  # Parallel execution
    .connect(agent_b, agent_d)
    .connect(agent_c, agent_d)
    .end_with(agent_d)
)

Workflow Visualization

Visualize your workflow to understand the execution flow:
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b, agent_c])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .connect(agent_b, agent_c)
    .end_with(agent_c)
    .visualize(output_path='workflow.png', title='My Workflow')
    .build()
)

Best Practices

Workflow Design

  1. Keep it simple: Start with linear workflows before adding complexity
  2. Use meaningful names: Name agents and workflows descriptively
  3. Handle errors: Always implement error handling and recovery
  4. Test thoroughly: Test workflows with various inputs

Performance Tips

  1. Use appropriate models: Choose models based on task complexity
  2. Implement caching: Cache expensive operations
  3. Optimize routing: Use efficient routing logic
  4. Monitor performance: Use telemetry to track workflow performance

Debugging

Enable debug logging at the Python level to troubleshoot workflows:
import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

# Build and run workflow - debug logs will show execution flow
workflow = (
    AriumBuilder()
    .add_agents([agent_a, agent_b])
    .start_with(agent_a)
    .connect(agent_a, agent_b)
    .end_with(agent_b)
    .build()
)

result = await workflow.run(["Test input"])