If you’re using OpenClaw for more than just simple, single-shot tasks, you’ve probably hit a wall when trying to manage multiple, ongoing AI workflows. The default OpenClaw setup, while robust for individual agent runs, doesn’t inherently parallelize complex, multi-stage operations very well. This often leads to a bottleneck where a single long-running task hogs resources, preventing other, potentially more urgent, agents from executing. Imagine you’re running a market sentiment analysis agent that processes daily news, an automated customer support agent that handles incoming queries, and a content generation agent that drafts blog posts – all simultaneously. Without a proper sub-agent architecture, these tasks will often queue up, leading to delays and missed opportunities.
Understanding the Problem with Single-Threaded Agents
By default, when you initiate an OpenClaw agent, say with openclaw run my_main_agent.py, that script typically encompasses the entire logic for a specific task. If that task involves multiple steps – fetching data, processing it with one LLM, then refining the output with another – it’s all handled sequentially within that single agent’s execution context. This is perfectly fine for many use cases. However, when you introduce concurrency requirements, like needing to process ten independent data streams or respond to five simultaneous user queries, the single agent model quickly becomes a bottleneck. Your main agent script ends up being a monolithic entity trying to manage disparate concerns, often leading to complex state management and error handling within one file, making it harder to debug and scale.
The core issue is resource contention and sequential execution. If your my_main_agent.py is busy fetching a large dataset or waiting for a slow LLM response, any other logical “task” within that same script has to wait. Even if you try to manage multiple internal “workflows” with async/await patterns in Python, you’re still constrained by the overall execution context of that single agent process. For true parallel processing, especially across different LLM calls that might have varying latencies or rate limits, you need a more distributed approach.
Implementing a Sub-Agent Architecture
The solution lies in breaking down your complex workflows into smaller, independent OpenClaw agents – let’s call them “sub-agents” – each responsible for a specific, atomic part of the overall process. The main agent then acts as an orchestrator, dispatching tasks to these sub-agents and collecting their results. This mimics a microservices architecture, but within the OpenClaw ecosystem.
Here’s how you structure it:
- Orchestrator Agent: This is your primary OpenClaw agent. Its role is to define the overall workflow, determine which sub-agents need to run, and manage their execution.
- Sub-Agents: These are individual OpenClaw agent scripts (e.g.,
data_fetch_agent.py,sentiment_analysis_agent.py,summary_generation_agent.py). Each sub-agent should encapsulate a single, well-defined task. They are designed to be run independently and return a specific output.
To make this work, you’ll leverage OpenClaw’s ability to run agents programmatically and, crucially, to pass data between them. The orchestrator won’t just execute sub-agents; it will often need to feed them inputs and consume their outputs.
Orchestrator Configuration and Logic
Let’s say your main agent is orchestrator_agent.py. Inside this agent, you’ll define the sequence of operations. Instead of directly calling LLMs for every step, you’ll invoke other OpenClaw agents. OpenClaw provides a programmatic API for this, but a more robust way for persistent, decoupled execution is to use file-based communication or a lightweight message queue if your setup permits. For simplicity and reliability on a VPS, we’ll stick to file-based communication.
Your orchestrator_agent.py might look something like this:
import json
import subprocess
import os
import time
class OrchestratorAgent:
def __init__(self):
self.output_dir = "/var/openclaw/outputs"
os.makedirs(self.output_dir, exist_ok=True)
def run_sub_agent(self, agent_name, input_data):
input_file = os.path.join(self.output_dir, f"{agent_name}_input.json")
output_file = os.path.join(self.output_dir, f"{agent_name}_output.json")
with open(input_file, "w") as f:
json.dump(input_data, f)
# Run the sub-agent in the background or wait for it
# For true parallelism, you'd daemonize this or use a process pool
# For simplicity here, we'll run it synchronously and wait.
# To run truly async and check for completion, you'd manage PIDs.
print(f"Running sub-agent: {agent_name} with input: {input_file}")
command = [
"openclaw", "run",
f"agents/{agent_name}.py",
"--input-file", input_file,
"--output-file", output_file
]
try:
# Use Popen to run asynchronously if you want to dispatch multiple
# and then collect. For this example, we'll wait.
process = subprocess.run(command, capture_output=True, text=True, check=True)
print(f"Sub-agent {agent_name} finished. STDOUT: {process.stdout} STDERR: {process.stderr}")
if os.path.exists(output_file):
with open(output_file, "r") as f:
return json.load(f)
else:
print(f"Warning: {output_file} not found after {agent_name} execution.")
return {"error": "output file not found"}
except subprocess.CalledProcessError as e:
print(f"Error running sub-agent {agent_name}: {e}")
print(f"Sub-agent STDOUT: {e.stdout}")
print(f"Sub-agent STDERR: {e.stderr}")
return {"error": str(e), "stdout": e.stdout, "stderr": e.stderr}
finally:
# Clean up input file if no longer needed
if os.path.exists(input_file):
os.remove(input_file)
# You might want to keep output files for debugging or auditing
def run(self):
# Example workflow
data_to_process = {"text": "The stock market saw a significant rally today, boosting investor confidence."}
# Step 1: Fetch data (might be a simple pass-through or actual fetching logic)
fetched_data = self.run_sub_agent("data_fetcher", data_to_process)
if fetched_data and not fetched_data.get("error"):
print(f"Fetched data: {fetched_data.get('content', 'N/A')}")
else:
print("Data fetch failed, exiting.")
return
# Step 2: Analyze sentiment
sentiment_result = self.run_sub_agent("sentiment_analyzer", {"text": fetched_data.get("content")})
if sentiment_result and not sentiment_result.get("error"):
print(f"Sentiment: {sentiment_result.get('sentiment', 'N/A')}")
else:
print("Sentiment analysis failed, exiting.")
return
# Step 3: Generate a summary based on sentiment
summary_input = {
"text": fetched_data.get("content"),
"sentiment": sentiment_result.get("sentiment")
}
summary_result = self.run_sub_agent("summary_generator", summary_input)
if summary_result and not summary_result.get("error"):
print(f"Summary: {summary_result.get('summary', 'N/A')}")
else:
print("Summary generation failed, exiting.")
return
print("Workflow completed successfully!")
# To run this orchestrator:
if __name__ == "__main__":
agent = OrchestratorAgent()
agent.run()
Sub-Agent Structure
Each sub-agent needs to be designed to read its input from a specified file (--input-file) and write its output to another specified file (--output-file). This standardized interface is critical for the orchestrator to interact with them effectively. Here’s an example of a sentiment_analyzer.py sub-agent:
import json
import argparse
import os
from openclaw import OpenClaw
class SentimentAnalyzerAgent:
Leave a Reply