edit on github↗

Example: Research pipeline

Use case: Automated competitive intelligence. A user asks for a report on a company or topic — the system searches the web, writes a structured report, and edits it for clarity. Runs unattended, recovers from crashes, and stores every run for later review.


Full code

from agentspan.agents import Agent, tool, run, start  # agentspan runtime
from pydantic import BaseModel

# ── Tools ────────────────────────────────────────────────────────────────────

@tool
def search_web(query: str) -> list[dict]:
    """Search the web. Returns a list of {title, url, snippet} results."""
    import httpx
    # Replace with your preferred search API (Brave, SerpAPI, Exa, etc.)
    resp = httpx.get(
        "https://api.search.brave.com/res/v1/web/search",
        params={"q": query, "count": 8},
        headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_API_KEY},
    )
    return resp.json()["web"]["results"]

@tool
def fetch_page(url: str) -> str:
    """Fetch and extract readable text from a URL."""
    import httpx
    from markdownify import markdownify
    resp = httpx.get(url, timeout=10, follow_redirects=True)
    return markdownify(resp.text)[:8000]  # trim to 8k chars

# ── Output schema ─────────────────────────────────────────────────────────────

class IntelligenceReport(BaseModel):
    subject: str
    summary: str                # 2–3 sentence executive summary
    key_findings: list[str]     # bullet-point findings
    sources: list[str]          # URLs used
    gaps: list[str]             # what couldn't be confirmed

# ── Agents ───────────────────────────────────────────────────────────────────

researcher = Agent(
    name="researcher",
    model="anthropic/claude-sonnet-4-6",
    tools=[search_web, fetch_page],
    instructions="""You are a research analyst. Given a topic:
1. Run 3–5 targeted searches
2. Fetch and read the most relevant pages
3. Collect facts, numbers, and quotes with source URLs
4. Return a structured research brief (not prose — facts and sources)""",
)

writer = Agent(
    name="writer",
    model="anthropic/claude-sonnet-4-6",
    output_type=IntelligenceReport,
    instructions="""You are a business intelligence writer.
Given a research brief, produce a structured IntelligenceReport.
Be specific. Use numbers and dates where available. Cite sources.""",
)

editor = Agent(
    name="editor",
    model="openai/gpt-4o",
    output_type=IntelligenceReport,
    instructions="""You are a senior editor. Review the report for:
- Factual consistency (does the report contradict itself?)
- Unsupported claims (flag any assertion without a source)
- Clarity (rewrite unclear sentences, keep the rest unchanged)
Return the corrected IntelligenceReport.""",
)

# ── Run ───────────────────────────────────────────────────────────────────────

if __name__ == "__main__":
    topic = "The rise of multi-agent systems in production software"

    result = run(researcher >> writer >> editor, topic)
    # Live run output is a dict with 'result' key
    report_data = result.output.get('result', result.output)
    print(report_data)
    print(f"Run ID:  {result.execution_id}")

What this demonstrates

Multi-agent pipeline (>>) — three agents run sequentially. Each agent sees only the output of the previous one, not the raw prompt. The researcher’s output is the writer’s input; the writer’s output is the editor’s input.

Structured output — both writer and editor use output_type=IntelligenceReport. Agentspan enforces the Pydantic schema on every LLM response, retrying if the model returns malformed output.

Crash recovery — the pipeline runs on the Agentspan server. If your process dies mid-run, the server resumes from the current agent when you restart. Nothing is re-run from scratch.

Run history — every execution is stored with inputs, outputs, token usage, and timing. Browse via CLI or UI:

agentspan agent execution --name researcher --since 7d
agentspan agent execution --name writer --status FAILED --since 1d

Or open http://localhost:6767 to browse execution history and replay past runs.


Variations

Run multiple topics concurrently

from agentspan.agents import start

topics = [
    "Multi-agent frameworks reshaping software development",
    "LangGraph 1.0 production deployments",
    "CrewAI enterprise customer traction",
]

handles = [start(researcher >> writer >> editor, t) for t in topics]
results = [h.stream().get_result() for h in handles]

for r in results:
    report_data = r.output.get('result', r.output)
    print(report_data)

Swap models per stage

Use a cheaper model for research (many small calls) and a stronger model for writing:

researcher = Agent(name="researcher", model="google_gemini/gemini-2.0-flash", ...)
writer     = Agent(name="writer",     model="anthropic/claude-sonnet-4-6", ...)
editor     = Agent(name="editor",     model="openai/gpt-4o", ...)

Schedule as a daily job

import schedule, time

def run_daily_intel():
    for topic in WATCH_LIST:
        start(researcher >> writer >> editor, topic)

schedule.every().day.at("07:00").do(run_daily_intel)
while True:
    schedule.run_pending()
    time.sleep(60)

Testing

Test the pipeline without LLM calls or a running server:

from agentspan.agents.testing import mock_run, MockEvent, expect

result = mock_run(
    researcher,
    "The rise of multi-agent systems in production software",
    events=[
        MockEvent.tool_call("search_web", {"query": "multi-agent systems production 2026"}),
        MockEvent.tool_result("search_web", [{"title": "...", "url": "...", "snippet": "..."}]),
        MockEvent.tool_call("fetch_page", {"url": "https://www.infoq.com/articles/multi-agent-production-2026"}),
        MockEvent.tool_result("fetch_page", "Multi-agent systems are increasingly deployed in production..."),
        MockEvent.done("Research complete. Key facts: ..."),
    ]
)

expect(result).completed().used_tool("search_web").used_tool("fetch_page")