Example: Batch document processor
Use case: Contract review at scale. A legal or compliance team needs to review hundreds of contracts — extract key terms, flag risks, and produce a structured summary for each one. The job must survive process crashes and be resumable without re-processing completed files.
Full code
from agentspan.agents import Agent, tool, start
from pydantic import BaseModel, Field
from pathlib import Path
import json
# ── Output schema ─────────────────────────────────────────────────────────────
class RiskLevel(str):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
class ContractReview(BaseModel):
file: str
contract_type: str # NDA, MSA, SOW, SaaS, etc.
parties: list[str]
effective_date: str | None
expiry_date: str | None
auto_renewal: bool
payment_terms: str | None
liability_cap: str | None
ip_ownership: str | None
risks: list[str] = Field(default_factory=list)
risk_level: str # "low" | "medium" | "high"
action_required: str | None # What legal needs to do, if anything
# ── Tools ────────────────────────────────────────────────────────────────────
@tool
def read_contract(path: str) -> str:
"""Read a contract file and return its text content."""
return Path(path).read_text(encoding="utf-8")
@tool
def flag_for_review(path: str, reason: str, risk_level: str) -> dict:
"""Flag a contract for human legal review."""
flags_db.insert({"path": path, "reason": reason, "risk": risk_level})
return {"flagged": True, "path": path}
@tool
def save_review(path: str, review_json: str) -> dict:
"""Save the completed review to the output store."""
review = json.loads(review_json)
reviews_db.upsert({"file": path}, review)
return {"saved": True}
# ── Agent ─────────────────────────────────────────────────────────────────────
contract_reviewer = Agent(
name="contract_reviewer",
model="anthropic/claude-sonnet-4-6",
output_type=ContractReview,
tools=[read_contract, flag_for_review, save_review],
instructions="""You are a paralegal specializing in technology contracts.
For each contract:
1. Read the full text
2. Extract all required fields into ContractReview
3. List specific risks (unusual clauses, missing protections, unfavorable terms)
4. Assign a risk level: low (standard terms), medium (some concerns), high (legal review needed)
5. If risk is medium or high: call flag_for_review with a clear reason
6. Call save_review with the completed JSON
Be precise about dates and monetary amounts.
If a field is not present in the contract, use null — do not guess.""",
)
# ── Batch runner ──────────────────────────────────────────────────────────────
def process_contracts(contract_dir: str, max_concurrent: int = 10):
paths = list(Path(contract_dir).glob("**/*.pdf")) + \
list(Path(contract_dir).glob("**/*.txt"))
print(f"Found {len(paths)} contracts to process")
# Skip already-completed files (idempotent restarts)
completed = {r["file"] for r in reviews_db.find({"saved": True})}
pending = [p for p in paths if str(p) not in completed]
print(f"{len(completed)} already done, {len(pending)} remaining")
# Process in batches to limit concurrency
for i in range(0, len(pending), max_concurrent):
batch = pending[i:i + max_concurrent]
handles = {
str(path): start(contract_reviewer, str(path))
for path in batch
}
for path, handle in handles.items():
try:
result = handle.stream().get_result()
review_data = result.output.get('result', result.output)
print(f" ✓ {Path(path).name} [{review_data}]")
except TimeoutError:
print(f" ✗ {Path(path).name} TIMEOUT — will retry next run")
except Exception as e:
print(f" ✗ {Path(path).name} FAILED: {e}")
if __name__ == "__main__":
process_contracts("./contracts/", max_concurrent=10)
What this demonstrates
Idempotent restarts — skip already-completed logic means you can kill and restart the process at any time. Completed contracts aren’t re-processed; in-progress contracts resume from where the server left them.
Concurrent execution — start() launches each contract as a separate workflow. Up to max_concurrent run in parallel on the server, each with its own isolated state.
Progress visibility — while the batch runs, open http://localhost:6767 to see all active and completed executions in real time, filtered by agent name and status.
Per-file history — every contract review is stored with its full execution trace. A month from now you can replay any review, see exactly what the agent read, and audit the reasoning.
Variations
Resume a failed contract
Find failed executions via CLI or UI:
agentspan agent execution --name contract_reviewer --status FAILED --since 1d
Or open http://localhost:6767 to browse executions visually. Re-run the contract by passing the same path to start() again — the idempotent skip logic will pick up where it left off for any completed contracts.
Add a second-pass reviewer for high-risk contracts
senior_reviewer = Agent(
name="senior_reviewer",
model="openai/gpt-4o",
output_type=ContractReview,
instructions="You are a senior contracts attorney. Review the initial analysis...",
)
# Route by risk level after first pass
def process_contract(path: str):
result = run(contract_reviewer, path)
review_data = result.output.get('result', result.output)
if isinstance(review_data, dict) and review_data.get('risk_level') == "high":
result = run(senior_reviewer, str(review_data))
return result.output.get('result', result.output)
Stream progress to a dashboard
from agentspan.agents import stream
for event in stream(contract_reviewer, contract_path):
if event.type == "tool_call":
send_to_dashboard({"step": event.tool_name, "contract": contract_path})
elif event.type == "done":
send_to_dashboard({"status": "complete", "output": event.output})
Testing
from agentspan.agents.testing import mock_run, MockEvent, expect
NDA_TEXT = """
MUTUAL NON-DISCLOSURE AGREEMENT
Parties: Acme Corp and Beta Inc.
Effective Date: 2026-01-15
Term: 3 years. Auto-renews annually unless terminated with 30 days notice.
Liability: Each party's liability is capped at $50,000.
IP: All shared information remains the property of the disclosing party.
"""
result = mock_run(
contract_reviewer,
"contracts/acme-nda.txt",
events=[
MockEvent.tool_call("read_contract", {"path": "contracts/acme-nda.txt"}),
MockEvent.tool_result("read_contract", NDA_TEXT),
MockEvent.tool_call("save_review", {
"path": "contracts/acme-nda.txt",
"review_json": '{"file": "contracts/acme-nda.txt", "contract_type": "NDA", ...}'
}),
MockEvent.tool_result("save_review", {"saved": True}),
MockEvent.done('{"file": "contracts/acme-nda.txt", "contract_type": "NDA", "risk_level": "low", ...}'),
]
)
expect(result).completed().used_tool("read_contract").used_tool("save_review")
# mock_run returns plain string output
assert "NDA" in result.output
assert "risk_level" in result.output
assert '"low"' in result.output