edit on github↗

Crash and resume

The problem: Most agent frameworks run the agent loop inside your process. If your process crashes — or you deploy a new version, restart a pod, or lose a network connection — the agent’s in-flight work is gone.

How Agentspan solves it: The agent loop runs on the Agentspan server, not in your process. Your worker registers tools and polls for tasks. The agent state lives on the server. Your process can die and restart freely.


How it works

Your process                Agentspan server
──────────────             ────────────────────────────
start(agent, prompt)  ──►  Creates workflow, starts agent loop
                           LLM call → tool scheduled → worker executes
Worker polls tasks ◄──────  Dispatch: run_my_tool(input)
Worker returns result ────► Continue agent loop
                           ...
process crashes            Agent loop continues on server
                           Next tool call is scheduled
Worker restarts ◄──────    Task is still queued, picked up on reconnect
                           Agent loop resumes from where it was

The Conductor engine underlying Agentspan has durable execution built in — the same engine that powers workflows at Netflix, LinkedIn, and Tesla.


Example: long-running analysis agent

import os
import time
from agentspan.agents import Agent, tool, start, AgentRuntime, AgentHandle

@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
    """Analyze a data chunk and return metrics."""
    # Your actual analysis logic here
    return {"chunk_id": chunk_id, "processed": True, "metrics": {"count": len(data)}}

@tool
def aggregate_results(results: list) -> dict:
    """Aggregate metrics from all chunks into a final report."""
    return {"total_chunks": len(results), "summary": "Analysis complete"}

agent = Agent(
    name="data_analysis_agent",
    model="openai/gpt-4o",
    tools=[analyze_chunk, aggregate_results],
    instructions="""Analyze data in chunks using analyze_chunk, then aggregate with aggregate_results.
    Process each chunk sequentially. Report progress as you go.""",
)

# Fire and forget — returns immediately
handle = start(agent, "Analyze customer feedback dataset: chunk 1, chunk 2, chunk 3")
print(f"Started: {handle.execution_id}")
# STARTED  execution_id=exec-f8a2c1

# Check status periodically
status = handle.get_status()
print(f"Status: {status.status}")  # RUNNING
print(f"Current step: {status.current_task}")

Reconnecting after a crash

The workflow ID is all you need to reconnect from any process, any machine.

If your agent has no @tool functions (LLM-only agent), reconnecting is one line:

from agentspan.agents import AgentRuntime, AgentHandle

# Your original process crashed. New process starts:
runtime = AgentRuntime()  # connects to the existing server

# Reconnect to the in-flight run using its execution ID
handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)
result = handle.stream().get_result()
print("Completed")

If your agent has @tool functions, the reconnecting process must also register those workers — otherwise the workflow will hang waiting for a worker that never arrives:

from agentspan.agents import Agent, tool, AgentRuntime, AgentHandle

# Re-define (or import) the same agent and tools
@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
    """Analyze a data chunk."""
    return {"chunk_id": chunk_id, "processed": True}

agent = Agent(
    name="data_analysis_agent",
    model="openai/gpt-4o",
    tools=[analyze_chunk],
    instructions="...",
)

with AgentRuntime() as runtime:
    # Start workers BEFORE reconnecting — this starts polling for tool tasks
    runtime.serve(agent, blocking=False)

    # Now reconnect to the in-flight workflow
    handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)

    status = handle.get_status()
    print(status.status)        # RUNNING (still going on the server)

    # Wait for the result
    result = handle.stream().get_result()
    print("Completed")

The agent never noticed your process crashed. It was running on the server the whole time.


Checking status from the CLI

agentspan agent status exec-f8a2c1
# RUNNING  step 847 / 3000  elapsed 4m32s

Production pattern: separate worker from invoker

In production, keep the worker process (which handles tool calls) separate from the invoker (which starts runs):

# worker.py — runs continuously, handles tool execution
import time
from agentspan.agents import Agent, tool, AgentRuntime, configure

configure(server_url="http://agentspan-server:6767")

@tool
def analyze_chunk(chunk_id: int, data: str) -> dict:
    """Analyze a data chunk and return metrics."""
    return {"chunk_id": chunk_id, "processed": True}

agent = Agent(name="data_analysis_agent", model="openai/gpt-4o", tools=[analyze_chunk])

runtime = AgentRuntime()
runtime.serve(agent, blocking=False)  # registers tool workers, starts polling

# Keep polling forever
while True:
    time.sleep(60)
# invoker.py — runs once per job (REST endpoint, cron, CLI, etc.)
from agentspan.agents import start, AgentRuntime, AgentHandle, configure

configure(server_url="http://agentspan-server:6767")

handle = start(agent, "Analyze the dataset")
print(f"Job ID: {handle.execution_id}")
# Store this ID — use it to reconnect or check status later

Idempotency: never re-process completed work

Use get_status() to skip work that’s already done before starting a new run:

from agentspan.agents import start, AgentRuntime, AgentHandle

def ensure_analysis_running(workflow_id: str | None, agent, prompt: str):
    """Start a new run or reconnect to an existing one."""
    if workflow_id:
        runtime = AgentRuntime()
        handle = AgentHandle(execution_id=workflow_id, runtime=runtime)
        status = handle.get_status()
        if status.is_complete:
            print("Already done")
            return handle
        if status.is_running or status.is_waiting:
            print(f"Still running: {status.status}")
            return handle
    # Start fresh
    return start(agent, prompt)

Full stream with reconnect

Stream events from a run — whether it’s new or already in progress:

from agentspan.agents import AgentRuntime, AgentHandle

runtime = AgentRuntime()
handle = AgentHandle(execution_id="exec-f8a2c1", runtime=runtime)

for event in handle.stream():
    if event.type == "tool_call":
        print(f"→ {event.tool_name}({event.args})")
    elif event.type == "tool_result":
        print(f"← {event.tool_name}: {event.result}")
    elif event.type == "done":
        print(f"\nResult: {event.output}")
        break