Documentation Index
Fetch the complete documentation index at: https://agenticadvertisingorg-changeset-release-main.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
This guide covers best practices for building AdCP orchestrators that handle asynchronous operations, pending states, and human-in-the-loop workflows.
Core Design Principles
1. Asynchronous First
The AdCP protocol is inherently asynchronous. Operations may take seconds, hours, or even days to complete.
DO:
- Design all operations as async/await
- Store operation state persistently
- Handle orchestrator restarts gracefully
- Implement proper timeout handling
DON’T:
- Assume immediate completion
- Use synchronous blocking calls
- Store state only in memory
- Retry indefinitely without backoff
2. Status-Driven Logic
Operations progress through standardized status values:
TASK_STATUSES = {
"submitted", # Long-running (hours to days) - provide webhook or poll
"working", # Processing (< 120 seconds) - poll frequently
"input-required", # Need user input/approval - continue conversation
"completed", # Success - process results
"failed", # Error - handle appropriately
"canceled", # User canceled
"auth-required" # Need authentication
}
3. State Machine Design
Implement proper state machines aligned with AdCP task statuses:
class OperationState(Enum):
# Local orchestrator states
REQUESTED = "requested"
CALLING_ADCP = "calling_adcp"
# AdCP task states (match server responses)
SUBMITTED = "submitted"
WORKING = "working"
INPUT_REQUIRED = "input_required"
COMPLETED = "completed"
FAILED = "failed"
CANCELED = "canceled"
# Valid state transitions
VALID_TRANSITIONS = {
"requested": ["calling_adcp"],
"calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
"submitted": ["working", "completed", "failed", "canceled"],
"working": ["completed", "failed", "input_required"],
"input_required": ["submitted", "working", "completed", "failed"]
}
Operation Tracking
Persistent Storage
Store all operations with comprehensive tracking:
class OperationTracker:
def __init__(self, db):
self.db = db
async def create_operation(self, operation_type, request_data, webhook_config=None):
operation = {
"id": str(uuid.uuid4()),
"type": operation_type,
"status": "requested",
"request": request_data,
"webhook_config": webhook_config,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"task_id": None,
"context_id": None,
"result": None,
"error": None
}
await self.db.operations.insert_one(operation)
return operation["id"]
async def update_status(self, operation_id, status, **kwargs):
update = {
"status": status,
"updated_at": datetime.now()
}
update.update(kwargs)
await self.db.operations.update_one(
{"id": operation_id},
{"$set": update}
)
async def get_pending_operations(self):
"""Get all operations that need monitoring"""
return await self.db.operations.find({
"status": {"$in": ["submitted", "working", "input_required"]}
}).to_list(length=None)
State Reconciliation
Sync local state with server on startup:
async def reconcile_with_server(self, adcp_client):
"""Sync local state with server using tasks/list"""
server_tasks = await adcp_client.call('tasks/list', {
'filters': {'statuses': ['submitted', 'working', 'input_required']}
})
server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
local_operations = await self.get_pending_operations()
local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}
return {
'orphaned_on_server': server_task_ids - local_task_ids,
'missing_from_server': local_task_ids - server_task_ids,
'total_pending_server': len(server_tasks['tasks']),
'total_pending_local': len(local_operations)
}
Async Operation Handler
Response Routing
Handle responses based on status:
class AsyncOperationHandler:
def __init__(self, adcp_client, tracker, notifier):
self.adcp = adcp_client
self.tracker = tracker
self.notifier = notifier
self.polling_tasks = {}
async def handle_operation_response(self, operation_id, response):
"""Handle any AdCP response with proper status routing"""
status = response.get("status")
# Update operation with response details
await self.tracker.update_status(
operation_id,
status,
task_id=response.get("task_id"),
context_id=response.get("context_id"),
result=response.get("result") if status == "completed" else None,
error=response.get("error") if status == "failed" else None
)
# Route based on status
if status == "completed":
await self._handle_completed(operation_id, response)
elif status == "failed":
await self._handle_failed(operation_id, response)
elif status == "submitted":
await self._handle_submitted(operation_id, response)
elif status == "working":
await self._handle_working(operation_id, response)
elif status == "input_required":
await self._handle_input_required(operation_id, response)
Submitted Operations
Handle long-running operations:
async def _handle_submitted(self, operation_id, response):
"""Handle long-running operations"""
task_id = response["task_id"]
# Check if webhook is configured
operation = await self.tracker.get_operation(operation_id)
webhook_config = operation.get("webhook_config")
if webhook_config:
# Webhook will handle completion notification
await self.notifier.notify_submitted_with_webhook(operation_id, task_id)
else:
# Start polling for completion
polling_task = asyncio.create_task(
self._poll_for_completion(operation_id, task_id, interval=60)
)
self.polling_tasks[task_id] = polling_task
Polling with Backoff
Implement efficient polling:
async def _poll_for_completion(self, operation_id, task_id, interval=60):
"""Poll task status until completion"""
max_polls = 1440 if interval == 60 else 24 # 24 hours or 2 minutes
poll_count = 0
while poll_count < max_polls:
try:
await asyncio.sleep(interval)
poll_count += 1
task_response = await self.adcp.call('tasks/get', {
'task_id': task_id,
'include_result': True
})
await self.handle_operation_response(operation_id, task_response)
if task_response["status"] in ["completed", "failed", "canceled"]:
break
except Exception as e:
await self.tracker.update_status(
operation_id,
"failed",
error=f"Polling error: {str(e)}"
)
break
self.polling_tasks.pop(task_id, None)
if poll_count >= max_polls:
await self.tracker.update_status(
operation_id,
"failed",
error="Task polling timeout"
)
Webhook Support
Reliable Webhook Handler
Implement webhooks with reliability patterns:
class WebhookHandler:
def __init__(self, tracker, notifier, secret_key):
self.tracker = tracker
self.notifier = notifier
self.secret_key = secret_key
self.processed_events = {}
def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
"""Verify webhook authenticity"""
expected_signature = hmac.new(
self.secret_key.encode(),
payload,
hashlib.sha256
).hexdigest()
return signature == f"sha256={expected_signature}"
async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
"""Prevent replay attacks using timestamp and event ID"""
event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
now = datetime.now()
if now - event_time > timedelta(minutes=5):
return True
return event_id in self.processed_events
Webhook + Polling Backup
Never rely solely on webhooks:
class ReliableWebhookOrchestrator:
def __init__(self):
self.webhook_timeout = timedelta(minutes=10)
self.backup_polling_delay = timedelta(minutes=2)
async def _handle_submitted_with_webhook(self, operation_id, task_id):
"""Handle submitted task with webhook + backup polling"""
async def backup_polling():
await asyncio.sleep(self.backup_polling_delay.total_seconds())
operation = await tracker.get_operation(operation_id)
if operation["status"] not in ["completed", "failed", "canceled"]:
logger.info(f"Starting backup polling for task {task_id}")
await self._poll_for_completion(operation_id, task_id, interval=60)
asyncio.create_task(backup_polling())
Example Orchestrator
Complete orchestrator implementation:
class AdCPOrchestrator:
def __init__(self):
self.adcp = AdCPClient()
self.tracker = OperationTracker(db)
self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
self.webhook_base_url = "https://orchestrator.com/webhooks"
async def create_campaign(self, user_id, request, enable_webhook=True):
"""Create a campaign with governance validation and full async handling.
Plans must already be synced via sync_plans before calling this method.
Plan creation happens during the planning phase, not at campaign creation time.
"""
# 1. Run intent check (plan must already exist)
if request.get("governance_context"):
gov_check = await self.adcp.call("check_governance", {
"plan_id": request["governance_context"]["plan_id"],
"caller": request["governance_context"]["caller"],
"tool": "create_media_buy",
"payload": request
})
if gov_check["status"] == "denied":
raise GovernanceDeniedError(gov_check["explanation"])
if gov_check["status"] == "conditions":
raise GovernanceConditionsError(gov_check["conditions"])
# If check_governance needs human review internally, it returns
# async task status (submitted/working) and resolves to
# approved or denied — standard task lifecycle.
# 2. Create the media buy
await self._create_media_buy(user_id, request, enable_webhook)
async def _create_media_buy(self, user_id, request, enable_webhook=True):
"""Create a media buy with full async handling."""
# 1. Prepare webhook configuration
webhook_config = None
if enable_webhook:
webhook_config = {
"webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
"webhook_auth": {
"type": "bearer",
"credentials": await self.get_webhook_token(user_id)
}
}
# 2. Create operation record
operation_id = await self.tracker.create_operation(
"create_media_buy",
request,
webhook_config=webhook_config
)
try:
# 3. Call AdCP
response = await self.adcp.call("create_media_buy", request, webhook_config)
# 4. Handle response
await self.handler.handle_operation_response(operation_id, response)
# 5. Return appropriate response to user
return self._format_user_response(operation_id, response)
except Exception as e:
await self.tracker.update_status(operation_id, "failed", error=str(e))
raise
async def reconcile_state_on_startup(self):
"""Recover from orchestrator restart"""
reconciliation = await self.tracker.reconcile_with_server(self.adcp)
logger.info(f"State reconciliation: {reconciliation}")
for task_id in reconciliation["orphaned_on_server"]:
# Resume monitoring orphaned tasks
operation_id = await self.tracker.create_operation(
"unknown",
{},
status="submitted"
)
await self.tracker.update_status(operation_id, "submitted", task_id=task_id)
asyncio.create_task(
self.handler._poll_for_completion(operation_id, task_id)
)
Governance in the Campaign Lifecycle
Plan creation (sync_plans) happens during the planning phase — before any campaigns exist. Governance checks happen during campaign execution. These are separate concerns.
Planning phase (once per media plan):
sync_plans — orchestrator pushes the plan to the governance agent
Campaign execution (per media buy):
check_governance(tool + payload) → create_media_buy → check_governance(media_buy_id + planned_delivery) → delivery → report_plan_outcome
| Phase | Who calls | Task | What happens on failure |
|---|
| Intent check | Orchestrator | check_governance (tool + payload) | Campaign violates buyer’s plan — denied or conditioned before any spend. If the governance agent needs human review, the task goes async and resolves to approved or denied. |
| Execution check | Seller | check_governance (media_buy_id + planned_delivery) | Seller’s delivery plan doesn’t match buyer’s expectations — purchase blocked |
| Delivery check | Seller | check_governance (phase: delivery + delivery_metrics) | Drift detected — pacing, geo, or channel distribution deviates from plan |
| Plan outcome | Orchestrator | report_plan_outcome | No feedback loop — governance agent cannot improve future recommendations |
See the media buy governance workflow for the complete sequence with code examples, and the seller integration guide for the seller’s execution check obligations.
Best Practices
1. Persistent Storage
Always use persistent storage for operation state:
- Database (PostgreSQL, MongoDB)
- Message queue (Redis, RabbitMQ)
- Distributed cache (Redis Cluster)
2. Idempotency
Make all operations idempotent:
async def create_media_buy_idempotent(self, request):
existing = await self.db.operations.find_one({
"type": "create_media_buy",
"request.po_number": request["po_number"],
"status": {"$in": ["created", "active"]}
})
if existing:
return existing["result"]
return await self.create_media_buy(request)
3. Timeout Handling
Implement reasonable timeouts:
OPERATION_TIMEOUTS = {
"create_media_buy": timedelta(hours=24),
"update_media_buy": timedelta(hours=12),
"creative_approval": timedelta(hours=48)
}
4. Error Recovery
Implement retry logic with circuit breakers:
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(min=1, max=60),
retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
try:
return await self.adcp.call(tool, params)
except RateLimitError:
raise TransientError("Rate limited")
except NetworkError:
raise TransientError("Network error")
5. Monitoring and Alerting
Track key metrics:
- Pending operation count by type
- Average approval time
- Rejection rate
- Task timeout rate
- API error rate
User Communication
Keep users informed about pending operations:
class UserNotifier:
async def notify_pending_approval(self, user_id, operation):
message = {
"type": "pending_approval",
"operation_id": operation["id"],
"message": "Your media buy requires publisher approval",
"estimated_time": "2-4 hours"
}
await self.send_notification(user_id, message)
async def notify_approval(self, user_id, operation):
message = {
"type": "operation_approved",
"operation_id": operation["id"],
"message": "Your media buy has been approved",
"media_buy_id": operation["result"]["media_buy_id"]
}
await self.send_notification(user_id, message)
Summary
Building a robust AdCP orchestrator requires:
- Asynchronous design throughout
- Proper state management with persistence
- Graceful handling of pending states
- User communication for long-running operations
- Monitoring and observability
Remember: Pending states are not errors - they’re a normal part of the advertising workflow.
Next Steps