Orchestrator Design Guide
This guide provides best practices and requirements for implementing an AdCP:Buy orchestrator that properly handles asynchronous operations, pending states, and human-in-the-loop workflows.Core Design Principles
1. Asynchronous First
The AdCP:Buy protocol is inherently asynchronous. Operations may take seconds, hours, or even days to complete. DO:- Design all operations as async/await
- Store operation state persistently
- Handle orchestrator restarts gracefully
- Implement proper timeout handling
- Assume immediate completion
- Use synchronous blocking calls
- Store state only in memory
- Retry indefinitely without backoff
2. Task Status Management
Operations progress through standardized status values:Copy
# AdCP Task Status Enum (aligned with A2A TaskState)
TASK_STATUSES = {
"submitted", # Long-running (hours to days) - provide webhook or poll
"working", # Processing (< 120 seconds) - poll frequently
"input-required", # Need user input/approval - continue conversation
"completed", # Success - process results
"failed", # Error - handle appropriately
"canceled", # User canceled
"auth-required" # Need authentication
}
# Key distinction: 'submitted' vs 'working' indicates expected completion time
3. State Machine Design
Implement proper state machines aligned with AdCP task statuses:Copy
class OperationState(Enum):
# Local orchestrator states
REQUESTED = "requested" # Initial request received
CALLING_ADCP = "calling_adcp" # Making AdCP API call
# AdCP task states (match server responses)
SUBMITTED = "submitted" # Long-running operation queued
WORKING = "working" # Short-term processing
INPUT_REQUIRED = "input_required" # Awaiting user input/approval
COMPLETED = "completed" # Success
FAILED = "failed" # Error
CANCELED = "canceled" # User cancellation
# Valid state transitions
VALID_TRANSITIONS = {
"requested": ["calling_adcp"],
"calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
"submitted": ["working", "completed", "failed", "canceled"],
"working": ["completed", "failed", "input_required"],
"input_required": ["submitted", "working", "completed", "failed"]
}
Implementation Requirements
1. Operation Tracking
Store all operation requests with comprehensive tracking:Copy
class OperationTracker:
def __init__(self, db):
self.db = db
async def create_operation(self, operation_type, request_data, webhook_config=None):
operation = {
"id": str(uuid.uuid4()),
"type": operation_type,
"status": "requested",
"request": request_data,
"webhook_config": webhook_config,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"task_id": None,
"context_id": None,
"result": None,
"error": None
}
await self.db.operations.insert_one(operation)
return operation["id"]
async def update_status(self, operation_id, status, **kwargs):
update = {
"status": status,
"updated_at": datetime.now()
}
update.update(kwargs)
await self.db.operations.update_one(
{"id": operation_id},
{"$set": update}
)
async def get_pending_operations(self):
"""Get all operations that need monitoring"""
return await self.db.operations.find({
"status": {"$in": ["submitted", "working", "input_required"]}
}).to_list(length=None)
async def reconcile_with_server(self, adcp_client):
"""Sync local state with server using tasks/list"""
server_tasks = await adcp_client.call('tasks/list', {
'filters': {'statuses': ['submitted', 'working', 'input_required']}
})
# Find operations that exist on server but not locally
server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
local_operations = await self.get_pending_operations()
local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}
orphaned_tasks = server_task_ids - local_task_ids
missing_tasks = local_task_ids - server_task_ids
return {
'orphaned_on_server': orphaned_tasks,
'missing_from_server': missing_tasks,
'total_pending_server': len(server_tasks['tasks']),
'total_pending_local': len(local_operations)
}
2. Async Operation Handler
Implement comprehensive async operation handling:Copy
class AsyncOperationHandler:
def __init__(self, adcp_client, tracker, notifier):
self.adcp = adcp_client
self.tracker = tracker
self.notifier = notifier
self.polling_tasks = {} # Track active polling tasks
async def handle_operation_response(self, operation_id, response):
"""Handle any AdCP response with proper status routing"""
status = response.get("status")
# Update operation with response details
await self.tracker.update_status(
operation_id,
status,
task_id=response.get("task_id"),
context_id=response.get("context_id"),
result=response.get("result") if status == "completed" else None,
error=response.get("error") if status == "failed" else None
)
# Route based on status
if status == "completed":
await self._handle_completed(operation_id, response)
elif status == "failed":
await self._handle_failed(operation_id, response)
elif status == "submitted":
await self._handle_submitted(operation_id, response)
elif status == "working":
await self._handle_working(operation_id, response)
elif status == "input_required":
await self._handle_input_required(operation_id, response)
async def _handle_submitted(self, operation_id, response):
"""Handle long-running operations"""
task_id = response["task_id"]
# Check if webhook is configured
operation = await self.tracker.get_operation(operation_id)
webhook_config = operation.get("webhook_config")
if webhook_config:
# Webhook will handle completion notification
await self.notifier.notify_submitted_with_webhook(
operation_id, task_id
)
else:
# Start polling for completion
polling_task = asyncio.create_task(
self._poll_for_completion(operation_id, task_id, interval=60)
)
self.polling_tasks[task_id] = polling_task
await self.notifier.notify_submitted_polling(
operation_id, task_id
)
async def _handle_working(self, operation_id, response):
"""Handle short-term processing operations"""
task_id = response["task_id"]
# Poll frequently for working tasks (should complete within 120s)
polling_task = asyncio.create_task(
self._poll_for_completion(operation_id, task_id, interval=5)
)
self.polling_tasks[task_id] = polling_task
await self.notifier.notify_working(operation_id, task_id)
async def _poll_for_completion(self, operation_id, task_id, interval=60):
"""Poll task status until completion"""
max_polls = 1440 if interval == 60 else 24 # 24 hours or 2 minutes
poll_count = 0
while poll_count < max_polls:
try:
await asyncio.sleep(interval)
poll_count += 1
# Poll using tasks/get
task_response = await self.adcp.call('tasks/get', {
'task_id': task_id,
'include_result': True
})
# Update operation with latest status
await self.handle_operation_response(operation_id, task_response)
# Stop polling if task is complete
if task_response["status"] in ["completed", "failed", "canceled"]:
break
except Exception as e:
await self.tracker.update_status(
operation_id,
"failed",
error=f"Polling error: {str(e)}"
)
break
# Clean up polling task
self.polling_tasks.pop(task_id, None)
if poll_count >= max_polls:
await self.tracker.update_status(
operation_id,
"failed",
error="Task polling timeout"
)
3. Task Monitoring
Implement efficient task monitoring with exponential backoff:Copy
class TaskMonitor:
def __init__(self, mcp_client):
self.mcp = mcp_client
self.monitoring_tasks = {}
async def monitor_task(self, operation_id, task_id):
"""Monitor a HITL task until completion."""
backoff = 30 # Start with 30 seconds
max_backoff = 300 # Max 5 minutes
while True:
try:
# Get task status
response = await self.mcp.call_tool(
"get_pending_tasks",
{"task_type": "manual_approval"}
)
task = self.find_task(response["tasks"], task_id)
if not task:
# Task disappeared - likely completed
break
if task["status"] == "completed":
await self.handle_task_completion(
operation_id, task
)
break
elif task["status"] == "failed":
await self.handle_task_rejection(
operation_id, task
)
break
# Exponential backoff
await asyncio.sleep(backoff)
backoff = min(backoff * 1.5, max_backoff)
except Exception as e:
logger.error(f"Error monitoring task {task_id}: {e}")
await asyncio.sleep(backoff)
4. Webhook Support with Reliability Patterns
Implement robust webhook endpoints following AdCP reliability patterns (see Core Concepts: Webhook Reliability):Copy
import hmac
import hashlib
from datetime import datetime, timedelta
class WebhookHandler:
def __init__(self, tracker, notifier, secret_key):
self.tracker = tracker
self.notifier = notifier
self.secret_key = secret_key
self.processed_events = {} # In production, use Redis/database
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook authenticity"""
expected_signature = hmac.new(
self.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()
return signature == f"sha256={expected_signature}"
async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
"""Prevent replay attacks using timestamp and event ID"""
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
now = datetime.now()
# Reject events older than 5 minutes
if now - event_time > timedelta(minutes=5):
return True
# Check if we've processed this event before
return event_id in self.processed_events
@app.post("/webhooks/adcp/{user_id}")
async def adcp_webhook(user_id: str, request: Request):
"""Handle AdCP task status updates with reliability patterns."""
# Verify webhook signature
signature = request.headers.get('x-adcp-signature')
payload = await request.body()
if not webhook_handler.verify_webhook_signature(payload, signature):
return {"error": "Invalid signature"}, 401
data = await request.json()
# Prevent replay attacks
if await webhook_handler.is_replay_attack(data["timestamp"], data["event_id"]):
return {"status": "ignored", "reason": "replay_attack"}, 200
# Idempotent processing
event_id = data["event_id"]
if event_id in webhook_handler.processed_events:
return {"status": "already_processed"}, 200
try:
# Record event as processed immediately
webhook_handler.processed_events[event_id] = {
"timestamp": data["timestamp"],
"task_id": data["task_id"]
}
# Process the webhook
await process_adcp_webhook(user_id, data)
return {"status": "processed"}, 200
except Exception as e:
# Log error but still return 200 to prevent retries
logger.error(f"Webhook processing error: {e}")
return {"status": "error", "message": str(e)}, 200
async def process_adcp_webhook(user_id: str, data: dict):
"""Process AdCP webhook with sequence handling"""
task_id = data["task_id"]
current_status = data["current_status"]
timestamp = data["timestamp"]
# Find associated operation
operation = await db.operations.find_one({"task_id": task_id})
if not operation:
logger.warning(f"Received webhook for unknown task: {task_id}")
return
# Check for out-of-order events using timestamps
operation_timestamp = operation.get("updated_at")
webhook_timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
if operation_timestamp and operation_timestamp >= webhook_timestamp:
logger.info(f"Ignoring out-of-order webhook for task {task_id}")
return
# Update operation status
update_data = {
"status": current_status,
"updated_at": webhook_timestamp
}
# Include result or error data
if current_status == "completed" and "result" in data:
update_data["result"] = data["result"]
elif current_status == "failed" and "error" in data:
update_data["error"] = data["error"]
await tracker.update_status(operation["id"], **update_data)
# Cancel any active polling for this task
if task_id in handler.polling_tasks:
handler.polling_tasks[task_id].cancel()
del handler.polling_tasks[task_id]
# Notify user of status change
await notifier.notify_status_change(user_id, operation["id"], current_status)
class ReliableWebhookOrchestrator:
"""Orchestrator with webhook reliability patterns"""
def __init__(self):
self.webhook_timeout = timedelta(minutes=10) # Max wait for webhook
self.backup_polling_delay = timedelta(minutes=2) # Start backup polling
async def _handle_submitted_with_webhook(self, operation_id, task_id, estimated_completion):
"""Handle submitted task with webhook + backup polling"""
# Start backup polling after delay (webhook should arrive first)
async def backup_polling():
await asyncio.sleep(self.backup_polling_delay.total_seconds())
# Check if webhook already updated the operation
operation = await tracker.get_operation(operation_id)
if operation["status"] not in ["completed", "failed", "canceled"]:
# Webhook didn't arrive, start polling
logger.info(f"Starting backup polling for task {task_id}")
await self._poll_for_completion(operation_id, task_id, interval=60)
# Schedule backup polling
asyncio.create_task(backup_polling())
# Set webhook timeout
async def webhook_timeout():
await asyncio.sleep(self.webhook_timeout.total_seconds())
operation = await tracker.get_operation(operation_id)
if operation["status"] not in ["completed", "failed", "canceled"]:
logger.warning(f"Webhook timeout for task {task_id}, falling back to polling")
await self._poll_for_completion(operation_id, task_id, interval=60)
asyncio.create_task(webhook_timeout())
async def health_check_webhooks(self):
"""Periodic health check for webhook delivery"""
# Check if recent operations with webhooks completed via webhook vs polling
recent_ops = await tracker.get_recent_operations_with_webhooks()
webhook_success_rate = calculate_webhook_success_rate(recent_ops)
if webhook_success_rate < 0.8: # Less than 80% webhook success
logger.warning(f"Webhook success rate low: {webhook_success_rate:.2%}")
# Consider disabling webhooks temporarily
await self.temporarily_disable_webhooks()
Webhook Best Practices for Orchestrators
- Always implement polling backup - Never rely solely on webhooks
- Use exponential backoff - For backup polling when webhooks fail
- Monitor webhook health - Track success rates and disable if needed
- Handle duplicates gracefully - Use event IDs for idempotent processing
- Implement proper timeouts - Don’t wait forever for webhook delivery
- Verify webhook authenticity - Always validate signatures
- Log webhook events - Maintain audit trail for debugging
5. User Communication
Keep users informed about pending operations:Copy
class UserNotifier:
async def notify_pending_approval(self, user_id, operation):
"""Notify user that operation needs approval."""
message = {
"type": "pending_approval",
"operation_id": operation["id"],
"operation_type": operation["type"],
"message": "Your media buy requires publisher approval",
"estimated_time": "2-4 hours",
"created_at": operation["created_at"]
}
await self.send_notification(user_id, message)
async def notify_approval(self, user_id, operation):
"""Notify user of approval."""
message = {
"type": "operation_approved",
"operation_id": operation["id"],
"message": "Your media buy has been approved and created",
"media_buy_id": operation["result"]["media_buy_id"]
}
await self.send_notification(user_id, message)
Example Orchestrator Flow
Copy
class AdCPOrchestrator:
def __init__(self):
self.adcp = AdCPClient() # MCP or A2A client
self.tracker = OperationTracker(db)
self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
self.webhook_base_url = "https://orchestrator.com/webhooks"
async def create_media_buy(self, user_id, request, enable_webhook=True):
"""Create a media buy with full async handling."""
# 1. Prepare webhook configuration
webhook_config = None
if enable_webhook:
webhook_config = {
"webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
"webhook_auth": {
"type": "bearer",
"credentials": await self.get_webhook_token(user_id)
}
}
# 2. Create operation record
operation_id = await self.tracker.create_operation(
"create_media_buy",
request,
webhook_config=webhook_config
)
try:
# 3. Call AdCP with protocol-level webhook config
if self.adcp.protocol == "mcp":
response = await self.adcp.call(
"create_media_buy",
request,
webhook_config
)
else: # A2A
response = await self.adcp.send({
"message": {
"parts": [{
"kind": "data",
"data": {
"skill": "create_media_buy",
"parameters": request
}
}]
},
"push_notification_config": webhook_config
})
# 4. Handle response with unified status handling
await self.handler.handle_operation_response(operation_id, response)
# 5. Return appropriate response to user
status = response["status"]
if status == "completed":
return {
"operation_id": operation_id,
"status": "completed",
"media_buy_id": response["media_buy_id"],
"creative_deadline": response.get("creative_deadline")
}
elif status == "submitted":
return {
"operation_id": operation_id,
"status": "submitted",
"task_id": response["task_id"],
"message": response["message"],
"webhook_configured": webhook_config is not None
}
elif status == "working":
return {
"operation_id": operation_id,
"status": "working",
"task_id": response["task_id"],
"message": "Processing, will complete within 2 minutes"
}
elif status == "input_required":
return {
"operation_id": operation_id,
"status": "input_required",
"message": response["message"],
"context_id": response["context_id"]
}
else: # failed
return {
"operation_id": operation_id,
"status": "failed",
"error": response.get("detail")
}
except Exception as e:
await self.tracker.update_status(
operation_id,
"failed",
error=str(e)
)
raise
async def webhook_handler(self, user_id, payload):
"""Handle webhook notifications from AdCP server"""
task_id = payload["task_id"]
# Find operation by task_id
operation = await self.tracker.find_by_task_id(task_id)
if not operation:
logger.warning(f"Received webhook for unknown task: {task_id}")
return
# Update operation with webhook payload
await self.handler.handle_operation_response(
operation["id"],
payload["result"]
)
async def reconcile_state_on_startup(self):
"""Recover from orchestrator restart by syncing with server"""
reconciliation = await self.tracker.reconcile_with_server(self.adcp)
logger.info(f"State reconciliation: {reconciliation}")
# Resume monitoring for orphaned server tasks
for task_id in reconciliation["orphaned_on_server"]:
# Create placeholder operation for orphaned task
operation_id = await self.tracker.create_operation(
"unknown", # We don't know the original type
{},
status="submitted"
)
await self.tracker.update_status(
operation_id,
"submitted",
task_id=task_id
)
# Start monitoring
asyncio.create_task(
self.handler._poll_for_completion(operation_id, task_id)
)
Copy
## Best Practices
### 1. Persistent Storage
Always use persistent storage for operation state:
- Database (MongoDB, PostgreSQL)
- Message queue (Redis, RabbitMQ)
- Distributed cache (Redis Cluster)
### 2. Idempotency
Make all operations idempotent:
```python
async def create_media_buy_idempotent(self, request):
# Check if already exists
existing = await self.db.operations.find_one({
"type": "create_media_buy",
"request.po_number": request["po_number"],
"status": {"$in": ["created", "active"]}
})
if existing:
return existing["result"]
# Proceed with creation
return await self.create_media_buy(request)
3. Timeout Handling
Implement reasonable timeouts:Copy
OPERATION_TIMEOUTS = {
"create_media_buy": timedelta(hours=24),
"update_media_buy": timedelta(hours=12),
"creative_approval": timedelta(hours=48)
}
4. Error Recovery
Implement retry logic with circuit breakers:Copy
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=1, max=60),
retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
try:
return await self.mcp.call_tool(tool, params)
except RateLimitError:
# Back off gracefully
raise TransientError("Rate limited")
except NetworkError:
# Retry network errors
raise TransientError("Network error")
5. Monitoring and Alerting
Track key metrics:- Pending operation count by type
- Average approval time
- Rejection rate
- Task timeout rate
- API error rate
Testing Considerations
1. Simulate Pending States
Test handling of all pending states:Copy
@pytest.mark.asyncio
async def test_manual_approval_flow():
orchestrator = AdCPOrchestrator()
# Mock to return pending_manual
with patch.object(orchestrator.mcp, 'call_tool') as mock:
mock.return_value = {
"status": "pending_manual",
"detail": "Manual approval required. Task ID: task_123"
}
result = await orchestrator.create_media_buy(
"user_1",
create_request
)
assert result["status"] == "pending_approval"
# Verify task monitoring started
assert "task_123" in orchestrator.monitor.monitoring_tasks
2. Test Timeout Scenarios
Ensure proper timeout handling:Copy
async def test_operation_timeout():
# Create operation that will timeout
operation_id = await tracker.create_operation(...)
# Fast forward time
with freeze_time() as frozen:
frozen.move_to(datetime.now() + timedelta(hours=25))
# Check timeout handler
await timeout_handler.check_timeouts()
# Verify marked as timed out
op = await tracker.get_operation(operation_id)
assert op["status"] == "timed_out"
Conclusion
Building a robust AdCP:Buy orchestrator requires:- Asynchronous design throughout
- Proper state management
- Graceful handling of pending states
- User communication
- Monitoring and observability