Example: Research pipeline
Use case: Automated competitive intelligence. A user asks for a report on a company or topic — the system searches the web, writes a structured report, and edits it for clarity. Runs unattended, recovers from crashes, and stores every run for later review.
Full code
from agentspan.agents import Agent, tool, run, start # agentspan runtime
from pydantic import BaseModel
# ── Tools ────────────────────────────────────────────────────────────────────
@tool
def search_web(query: str) -> list[dict]:
"""Search the web. Returns a list of {title, url, snippet} results."""
import httpx
# Replace with your preferred search API (Brave, SerpAPI, Exa, etc.)
resp = httpx.get(
"https://api.search.brave.com/res/v1/web/search",
params={"q": query, "count": 8},
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
)
return resp.json()["web"]["results"]
@tool
def fetch_page(url: str) -> str:
"""Fetch and extract readable text from a URL."""
import httpx
from markdownify import markdownify
resp = httpx.get(url, timeout=10, follow_redirects=True)
return markdownify(resp.text)[:8000] # trim to 8k chars
# ── Output schema ─────────────────────────────────────────────────────────────
class IntelligenceReport(BaseModel):
subject: str
summary: str # 2–3 sentence executive summary
key_findings: list[str] # bullet-point findings
sources: list[str] # URLs used
gaps: list[str] # what couldn't be confirmed
# ── Agents ───────────────────────────────────────────────────────────────────
researcher = Agent(
name="researcher",
model="anthropic/claude-sonnet-4-6",
tools=[search_web, fetch_page],
instructions="""You are a research analyst. Given a topic:
1. Run 3–5 targeted searches
2. Fetch and read the most relevant pages
3. Collect facts, numbers, and quotes with source URLs
4. Return a structured research brief (not prose — facts and sources)""",
)
writer = Agent(
name="writer",
model="anthropic/claude-sonnet-4-6",
output_type=IntelligenceReport,
instructions="""You are a business intelligence writer.
Given a research brief, produce a structured IntelligenceReport.
Be specific. Use numbers and dates where available. Cite sources.""",
)
editor = Agent(
name="editor",
model="openai/gpt-4o",
output_type=IntelligenceReport,
instructions="""You are a senior editor. Review the report for:
- Factual consistency (does the report contradict itself?)
- Unsupported claims (flag any assertion without a source)
- Clarity (rewrite unclear sentences, keep the rest unchanged)
Return the corrected IntelligenceReport.""",
)
# ── Run ───────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
topic = "The rise of multi-agent systems in production software"
result = run(researcher >> writer >> editor, topic)
# Live run output is a dict with 'result' key
report_data = result.output.get('result', result.output)
print(report_data)
print(f"Run ID: {result.execution_id}")
What this demonstrates
Multi-agent pipeline (>>) — three agents run sequentially. Each agent sees only the output of the previous one, not the raw prompt. The researcher’s output is the writer’s input; the writer’s output is the editor’s input.
Structured output — both writer and editor use output_type=IntelligenceReport. Agentspan enforces the Pydantic schema on every LLM response, retrying if the model returns malformed output.
Crash recovery — the pipeline runs on the Agentspan server. If your process dies mid-run, the server resumes from the current agent when you restart. Nothing is re-run from scratch.
Run history — every execution is stored with inputs, outputs, token usage, and timing. Browse via CLI or UI:
agentspan agent execution --name researcher --since 7d
agentspan agent execution --name writer --status FAILED --since 1d
Or open http://localhost:6767 to browse execution history and replay past runs.
Variations
Run multiple topics concurrently
from agentspan.agents import start
topics = [
"Multi-agent frameworks reshaping software development",
"LangGraph 1.0 production deployments",
"CrewAI enterprise customer traction",
]
handles = [start(researcher >> writer >> editor, t) for t in topics]
results = [h.stream().get_result() for h in handles]
for r in results:
report_data = r.output.get('result', r.output)
print(report_data)
Swap models per stage
Use a cheaper model for research (many small calls) and a stronger model for writing:
researcher = Agent(name="researcher", model="google_gemini/gemini-2.0-flash", ...)
writer = Agent(name="writer", model="anthropic/claude-sonnet-4-6", ...)
editor = Agent(name="editor", model="openai/gpt-4o", ...)
Schedule as a daily job
import schedule, time
def run_daily_intel():
for topic in WATCH_LIST:
start(researcher >> writer >> editor, topic)
schedule.every().day.at("07:00").do(run_daily_intel)
while True:
schedule.run_pending()
time.sleep(60)
Testing
Test the pipeline without LLM calls or a running server:
from agentspan.agents.testing import mock_run, MockEvent, expect
result = mock_run(
researcher,
"The rise of multi-agent systems in production software",
events=[
MockEvent.tool_call("search_web", {"query": "multi-agent systems production 2026"}),
MockEvent.tool_result("search_web", [{"title": "...", "url": "...", "snippet": "..."}]),
MockEvent.tool_call("fetch_page", {"url": "https://www.infoq.com/articles/multi-agent-production-2026"}),
MockEvent.tool_result("fetch_page", "Multi-agent systems are increasingly deployed in production..."),
MockEvent.done("Research complete. Key facts: ..."),
]
)
expect(result).completed().used_tool("search_web").used_tool("fetch_page")