Crash and resume
The problem: Most agent frameworks run the agent loop inside your process. If your process crashes — or you deploy a new version, restart a pod, or lose a network connection — the agent’s in-flight work is gone.
How Agentspan solves it: The agent loop runs on the Agentspan server, not in your process. Your worker registers tools and polls for tasks. The agent state lives on the server. Your process can die and restart freely.
How it works
Your process Agentspan server
────────────── ────────────────────────────
start(agent, prompt) ──► Creates workflow, starts agent loop
LLM call → tool scheduled → worker executes
Worker polls tasks ◄────── Dispatch: run_my_tool(input)
Worker returns result ────► Continue agent loop
...
process crashes Agent loop continues on server
Next tool call is scheduled
Worker restarts ◄────── Task is still queued, picked up on reconnect
Agent loop resumes from where it was
The Conductor engine underlying Agentspan has durable execution built in — the same engine that powers workflows at Netflix, LinkedIn, and Tesla.
Example: long-running analysis agent
import os
import time
from agentspan.agents import Agent, tool, start, AgentRuntime, AgentHandle
@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
"""Analyze a data chunk and return metrics."""
# Your actual analysis logic here
return {"chunk_id": chunk_id, "processed": True, "metrics": {"count": len(data)}}
@tool
def aggregate_results(results: list) -> dict:
"""Aggregate metrics from all chunks into a final report."""
return {"total_chunks": len(results), "summary": "Analysis complete"}
agent = Agent(
name="data_analysis_agent",
model="openai/gpt-4o",
tools=[analyze_chunk, aggregate_results],
instructions="""Analyze data in chunks using analyze_chunk, then aggregate with aggregate_results.
Process each chunk sequentially. Report progress as you go.""",
)
# Fire and forget — returns immediately
handle = start(agent, "Analyze customer feedback dataset: chunk 1, chunk 2, chunk 3")
print(f"Started: {handle.execution_id}")
# STARTED execution_id=exec-f8a2c1
# Check status periodically
status = handle.get_status()
print(f"Status: {status.status}") # RUNNING
print(f"Current step: {status.current_task}")
Reconnecting after a crash
The workflow ID is all you need to reconnect from any process, any machine.
If your agent has no @tool functions (LLM-only agent), reconnecting is one line:
from agentspan.agents import AgentRuntime, AgentHandle
# Your original process crashed. New process starts:
runtime = AgentRuntime() # connects to the existing server
# Reconnect to the in-flight run using its execution ID
handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)
result = handle.stream().get_result()
print("Completed")
If your agent has @tool functions, the reconnecting process must also register those workers — otherwise the workflow will hang waiting for a worker that never arrives:
from agentspan.agents import Agent, tool, AgentRuntime, AgentHandle
# Re-define (or import) the same agent and tools
@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
"""Analyze a data chunk."""
return {"chunk_id": chunk_id, "processed": True}
agent = Agent(
name="data_analysis_agent",
model="openai/gpt-4o",
tools=[analyze_chunk],
instructions="...",
)
with AgentRuntime() as runtime:
# Start workers BEFORE reconnecting — this starts polling for tool tasks
runtime.serve(agent, blocking=False)
# Now reconnect to the in-flight workflow
handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)
status = handle.get_status()
print(status.status) # RUNNING (still going on the server)
# Wait for the result
result = handle.stream().get_result()
print("Completed")
The agent never noticed your process crashed. It was running on the server the whole time.
Checking status from the CLI
agentspan agent status exec-f8a2c1
# RUNNING step 847 / 3000 elapsed 4m32s
Production pattern: separate worker from invoker
In production, keep the worker process (which handles tool calls) separate from the invoker (which starts runs):
# worker.py — runs continuously, handles tool execution
import time
from agentspan.agents import Agent, tool, AgentRuntime, configure
configure(server_url="http://agentspan-server:6767")
@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
"""Analyze a data chunk and return metrics."""
return {"chunk_id": chunk_id, "processed": True}
agent = Agent(name="data_analysis_agent", model="openai/gpt-4o", tools=[analyze_chunk])
runtime = AgentRuntime()
runtime.serve(agent, blocking=False) # registers tool workers, starts polling
# Keep polling forever
while True:
time.sleep(60)
# invoker.py — runs once per job (REST endpoint, cron, CLI, etc.)
from agentspan.agents import start, AgentRuntime, AgentHandle, configure
configure(server_url="http://agentspan-server:6767")
handle = start(agent, "Analyze the dataset")
print(f"Job ID: {handle.execution_id}")
# Store this ID — use it to reconnect or check status later
Idempotency: never re-process completed work
Use get_status() to skip work that’s already done before starting a new run:
from agentspan.agents import start, AgentRuntime, AgentHandle
def ensure_analysis_running(workflow_id: str | None, agent, prompt: str):
"""Start a new run or reconnect to an existing one."""
if workflow_id:
runtime = AgentRuntime()
handle = AgentHandle(execution_id=workflow_id, runtime=runtime)
status = handle.get_status()
if status.is_complete:
print("Already done")
return handle
if status.is_running or status.is_waiting:
print(f"Still running: {status.status}")
return handle
# Start fresh
return start(agent, prompt)
Full stream with reconnect
Stream events from a run — whether it’s new or already in progress:
from agentspan.agents import AgentRuntime, AgentHandle
runtime = AgentRuntime()
handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)
for event in handle.stream():
if event.type == "tool_call":
print(f"→ {event.tool_name}({event.args})")
elif event.type == "tool_result":
print(f"← {event.tool_name}: {event.result}")
elif event.type == "done":
print(f"\nResult: {event.output}")
break