edit on github↗

Example: Batch document processor

Use case: Contract review at scale. A legal or compliance team needs to review hundreds of contracts — extract key terms, flag risks, and produce a structured summary for each one. The job must survive process crashes and be resumable without re-processing completed files.


Full code

from agentspan.agents import Agent, tool, start
from pydantic import BaseModel, Field
from pathlib import Path
import json

# ── Output schema ─────────────────────────────────────────────────────────────

class RiskLevel(str):
    LOW    = "low"
    MEDIUM = "medium"
    HIGH   = "high"

class ContractReview(BaseModel):
    file: str
    contract_type: str                   # NDA, MSA, SOW, SaaS, etc.
    parties: list[str]
    effective_date: str | None
    expiry_date: str | None
    auto_renewal: bool
    payment_terms: str | None
    liability_cap: str | None
    ip_ownership: str | None
    risks: list[str] = Field(default_factory=list)
    risk_level: str                      # "low" | "medium" | "high"
    action_required: str | None          # What legal needs to do, if anything

# ── Tools ────────────────────────────────────────────────────────────────────

@tool
def read_contract(path: str) -> str:
    """Read a contract file and return its text content."""
    return Path(path).read_text(encoding="utf-8")

@tool
def flag_for_review(path: str, reason: str, risk_level: str) -> dict:
    """Flag a contract for human legal review."""
    flags_db.insert({"path": path, "reason": reason, "risk": risk_level})
    return {"flagged": True, "path": path}

@tool
def save_review(path: str, review_json: str) -> dict:
    """Save the completed review to the output store."""
    review = json.loads(review_json)
    reviews_db.upsert({"file": path}, review)
    return {"saved": True}

# ── Agent ─────────────────────────────────────────────────────────────────────

contract_reviewer = Agent(
    name="contract_reviewer",
    model="anthropic/claude-sonnet-4-6",
    output_type=ContractReview,
    tools=[read_contract, flag_for_review, save_review],
    instructions="""You are a paralegal specializing in technology contracts.

For each contract:
1. Read the full text
2. Extract all required fields into ContractReview
3. List specific risks (unusual clauses, missing protections, unfavorable terms)
4. Assign a risk level: low (standard terms), medium (some concerns), high (legal review needed)
5. If risk is medium or high: call flag_for_review with a clear reason
6. Call save_review with the completed JSON

Be precise about dates and monetary amounts.
If a field is not present in the contract, use null — do not guess.""",
)

# ── Batch runner ──────────────────────────────────────────────────────────────

def process_contracts(contract_dir: str, max_concurrent: int = 10):
    paths = list(Path(contract_dir).glob("**/*.pdf")) + \
            list(Path(contract_dir).glob("**/*.txt"))

    print(f"Found {len(paths)} contracts to process")

    # Skip already-completed files (idempotent restarts)
    completed = {r["file"] for r in reviews_db.find({"saved": True})}
    pending = [p for p in paths if str(p) not in completed]
    print(f"{len(completed)} already done, {len(pending)} remaining")

    # Process in batches to limit concurrency
    for i in range(0, len(pending), max_concurrent):
        batch = pending[i:i + max_concurrent]

        handles = {
            str(path): start(contract_reviewer, str(path))
            for path in batch
        }

        for path, handle in handles.items():
            try:
                result = handle.stream().get_result()
                review_data = result.output.get('result', result.output)
                print(f"  ✓ {Path(path).name}  [{review_data}]")
            except TimeoutError:
                print(f"  ✗ {Path(path).name}  TIMEOUT — will retry next run")
            except Exception as e:
                print(f"  ✗ {Path(path).name}  FAILED: {e}")

if __name__ == "__main__":
    process_contracts("./contracts/", max_concurrent=10)

What this demonstrates

Idempotent restartsskip already-completed logic means you can kill and restart the process at any time. Completed contracts aren’t re-processed; in-progress contracts resume from where the server left them.

Concurrent executionstart() launches each contract as a separate workflow. Up to max_concurrent run in parallel on the server, each with its own isolated state.

Progress visibility — while the batch runs, open http://localhost:6767 to see all active and completed executions in real time, filtered by agent name and status.

Per-file history — every contract review is stored with its full execution trace. A month from now you can replay any review, see exactly what the agent read, and audit the reasoning.


Variations

Resume a failed contract

Find failed executions via CLI or UI:

agentspan agent execution --name contract_reviewer --status FAILED --since 1d

Or open http://localhost:6767 to browse executions visually. Re-run the contract by passing the same path to start() again — the idempotent skip logic will pick up where it left off for any completed contracts.

Add a second-pass reviewer for high-risk contracts

senior_reviewer = Agent(
    name="senior_reviewer",
    model="openai/gpt-4o",
    output_type=ContractReview,
    instructions="You are a senior contracts attorney. Review the initial analysis...",
)

# Route by risk level after first pass
def process_contract(path: str):
    result = run(contract_reviewer, path)
    review_data = result.output.get('result', result.output)
    if isinstance(review_data, dict) and review_data.get('risk_level') == "high":
        result = run(senior_reviewer, str(review_data))
    return result.output.get('result', result.output)

Stream progress to a dashboard

from agentspan.agents import stream

for event in stream(contract_reviewer, contract_path):
    if event.type == "tool_call":
        send_to_dashboard({"step": event.tool_name, "contract": contract_path})
    elif event.type == "done":
        send_to_dashboard({"status": "complete", "output": event.output})

Testing

from agentspan.agents.testing import mock_run, MockEvent, expect

NDA_TEXT = """
MUTUAL NON-DISCLOSURE AGREEMENT
Parties: Acme Corp and Beta Inc.
Effective Date: 2026-01-15
Term: 3 years. Auto-renews annually unless terminated with 30 days notice.
Liability: Each party's liability is capped at $50,000.
IP: All shared information remains the property of the disclosing party.
"""

result = mock_run(
    contract_reviewer,
    "contracts/acme-nda.txt",
    events=[
        MockEvent.tool_call("read_contract", {"path": "contracts/acme-nda.txt"}),
        MockEvent.tool_result("read_contract", NDA_TEXT),
        MockEvent.tool_call("save_review", {
            "path": "contracts/acme-nda.txt",
            "review_json": '{"file": "contracts/acme-nda.txt", "contract_type": "NDA", ...}'
        }),
        MockEvent.tool_result("save_review", {"saved": True}),
        MockEvent.done('{"file": "contracts/acme-nda.txt", "contract_type": "NDA", "risk_level": "low", ...}'),
    ]
)

expect(result).completed().used_tool("read_contract").used_tool("save_review")
# mock_run returns plain string output
assert "NDA" in result.output
assert "risk_level" in result.output
assert '"low"' in result.output