Skip to main content

Orchestrator Design Guide

This guide provides best practices and requirements for implementing an AdCP:Buy orchestrator that properly handles asynchronous operations, pending states, and human-in-the-loop workflows.

Core Design Principles

1. Asynchronous First

The AdCP:Buy protocol is inherently asynchronous. Operations may take seconds, hours, or even days to complete. DO:
  • Design all operations as async/await
  • Store operation state persistently
  • Handle orchestrator restarts gracefully
  • Implement proper timeout handling
DON’T:
  • Assume immediate completion
  • Use synchronous blocking calls
  • Store state only in memory
  • Retry indefinitely without backoff

2. Task Status Management

Operations progress through standardized status values:
# AdCP Task Status Enum (aligned with A2A TaskState)
TASK_STATUSES = {
    "submitted",      # Long-running (hours to days) - provide webhook or poll
    "working",        # Processing (< 120 seconds) - poll frequently  
    "input-required", # Need user input/approval - continue conversation
    "completed",      # Success - process results
    "failed",         # Error - handle appropriately
    "canceled",       # User canceled
    "auth-required"   # Need authentication
}

# Key distinction: 'submitted' vs 'working' indicates expected completion time

3. State Machine Design

Implement proper state machines aligned with AdCP task statuses:
class OperationState(Enum):
    # Local orchestrator states
    REQUESTED = "requested"           # Initial request received
    CALLING_ADCP = "calling_adcp"     # Making AdCP API call
    
    # AdCP task states (match server responses)
    SUBMITTED = "submitted"           # Long-running operation queued
    WORKING = "working"              # Short-term processing
    INPUT_REQUIRED = "input_required" # Awaiting user input/approval
    COMPLETED = "completed"          # Success
    FAILED = "failed"                # Error
    CANCELED = "canceled"            # User cancellation

# Valid state transitions
VALID_TRANSITIONS = {
    "requested": ["calling_adcp"],
    "calling_adcp": ["submitted", "working", "input_required", "completed", "failed"],
    "submitted": ["working", "completed", "failed", "canceled"],
    "working": ["completed", "failed", "input_required"],
    "input_required": ["submitted", "working", "completed", "failed"]
}

Implementation Requirements

1. Operation Tracking

Store all operation requests with comprehensive tracking:
class OperationTracker:
    def __init__(self, db):
        self.db = db
    
    async def create_operation(self, operation_type, request_data, webhook_config=None):
        operation = {
            "id": str(uuid.uuid4()),
            "type": operation_type,
            "status": "requested",
            "request": request_data,
            "webhook_config": webhook_config,
            "created_at": datetime.now(),
            "updated_at": datetime.now(),
            "task_id": None,
            "context_id": None,
            "result": None,
            "error": None
        }
        await self.db.operations.insert_one(operation)
        return operation["id"]
    
    async def update_status(self, operation_id, status, **kwargs):
        update = {
            "status": status, 
            "updated_at": datetime.now()
        }
        update.update(kwargs)
        
        await self.db.operations.update_one(
            {"id": operation_id},
            {"$set": update}
        )
    
    async def get_pending_operations(self):
        """Get all operations that need monitoring"""
        return await self.db.operations.find({
            "status": {"$in": ["submitted", "working", "input_required"]}
        }).to_list(length=None)
    
    async def reconcile_with_server(self, adcp_client):
        """Sync local state with server using tasks/list"""
        server_tasks = await adcp_client.call('tasks/list', {
            'filters': {'statuses': ['submitted', 'working', 'input_required']}
        })
        
        # Find operations that exist on server but not locally
        server_task_ids = {task['task_id'] for task in server_tasks['tasks']}
        local_operations = await self.get_pending_operations()
        local_task_ids = {op['task_id'] for op in local_operations if op['task_id']}
        
        orphaned_tasks = server_task_ids - local_task_ids
        missing_tasks = local_task_ids - server_task_ids
        
        return {
            'orphaned_on_server': orphaned_tasks,
            'missing_from_server': missing_tasks,
            'total_pending_server': len(server_tasks['tasks']),
            'total_pending_local': len(local_operations)
        }

2. Async Operation Handler

Implement comprehensive async operation handling:
class AsyncOperationHandler:
    def __init__(self, adcp_client, tracker, notifier):
        self.adcp = adcp_client
        self.tracker = tracker
        self.notifier = notifier
        self.polling_tasks = {}  # Track active polling tasks
    
    async def handle_operation_response(self, operation_id, response):
        """Handle any AdCP response with proper status routing"""
        status = response.get("status")
        
        # Update operation with response details
        await self.tracker.update_status(
            operation_id,
            status,
            task_id=response.get("task_id"),
            context_id=response.get("context_id"),
            result=response.get("result") if status == "completed" else None,
            error=response.get("error") if status == "failed" else None
        )
        
        # Route based on status
        if status == "completed":
            await self._handle_completed(operation_id, response)
        elif status == "failed":
            await self._handle_failed(operation_id, response)
        elif status == "submitted":
            await self._handle_submitted(operation_id, response)
        elif status == "working":
            await self._handle_working(operation_id, response)
        elif status == "input_required":
            await self._handle_input_required(operation_id, response)
    
    async def _handle_submitted(self, operation_id, response):
        """Handle long-running operations"""
        task_id = response["task_id"]
        
        # Check if webhook is configured
        operation = await self.tracker.get_operation(operation_id)
        webhook_config = operation.get("webhook_config")
        
        if webhook_config:
            # Webhook will handle completion notification
            await self.notifier.notify_submitted_with_webhook(
                operation_id, task_id
            )
        else:
            # Start polling for completion
            polling_task = asyncio.create_task(
                self._poll_for_completion(operation_id, task_id, interval=60)
            )
            self.polling_tasks[task_id] = polling_task
            
            await self.notifier.notify_submitted_polling(
                operation_id, task_id
            )
    
    async def _handle_working(self, operation_id, response):
        """Handle short-term processing operations"""
        task_id = response["task_id"]
        
        # Poll frequently for working tasks (should complete within 120s)
        polling_task = asyncio.create_task(
            self._poll_for_completion(operation_id, task_id, interval=5)
        )
        self.polling_tasks[task_id] = polling_task
        
        await self.notifier.notify_working(operation_id, task_id)
    
    async def _poll_for_completion(self, operation_id, task_id, interval=60):
        """Poll task status until completion"""
        max_polls = 1440 if interval == 60 else 24  # 24 hours or 2 minutes
        poll_count = 0
        
        while poll_count < max_polls:
            try:
                await asyncio.sleep(interval)
                poll_count += 1
                
                # Poll using tasks/get
                task_response = await self.adcp.call('tasks/get', {
                    'task_id': task_id,
                    'include_result': True
                })
                
                # Update operation with latest status
                await self.handle_operation_response(operation_id, task_response)
                
                # Stop polling if task is complete
                if task_response["status"] in ["completed", "failed", "canceled"]:
                    break
                    
            except Exception as e:
                await self.tracker.update_status(
                    operation_id, 
                    "failed",
                    error=f"Polling error: {str(e)}"
                )
                break
        
        # Clean up polling task
        self.polling_tasks.pop(task_id, None)
        
        if poll_count >= max_polls:
            await self.tracker.update_status(
                operation_id,
                "failed", 
                error="Task polling timeout"
            )

3. Task Monitoring

Implement efficient task monitoring with exponential backoff:
class TaskMonitor:
    def __init__(self, mcp_client):
        self.mcp = mcp_client
        self.monitoring_tasks = {}
    
    async def monitor_task(self, operation_id, task_id):
        """Monitor a HITL task until completion."""
        backoff = 30  # Start with 30 seconds
        max_backoff = 300  # Max 5 minutes
        
        while True:
            try:
                # Get task status
                response = await self.mcp.call_tool(
                    "get_pending_tasks",
                    {"task_type": "manual_approval"}
                )
                
                task = self.find_task(response["tasks"], task_id)
                if not task:
                    # Task disappeared - likely completed
                    break
                
                if task["status"] == "completed":
                    await self.handle_task_completion(
                        operation_id, task
                    )
                    break
                elif task["status"] == "failed":
                    await self.handle_task_rejection(
                        operation_id, task
                    )
                    break
                
                # Exponential backoff
                await asyncio.sleep(backoff)
                backoff = min(backoff * 1.5, max_backoff)
                
            except Exception as e:
                logger.error(f"Error monitoring task {task_id}: {e}")
                await asyncio.sleep(backoff)

4. Webhook Support with Reliability Patterns

Implement robust webhook endpoints following AdCP reliability patterns (see Core Concepts: Webhook Reliability):
import hmac
import hashlib
from datetime import datetime, timedelta

class WebhookHandler:
    def __init__(self, tracker, notifier, secret_key):
        self.tracker = tracker
        self.notifier = notifier
        self.secret_key = secret_key
        self.processed_events = {}  # In production, use Redis/database
        
    def verify_webhook_signature(self, payload: bytes, signature: str) -> bool:
        """Verify webhook authenticity"""
        expected_signature = hmac.new(
            self.secret_key.encode(),
            payload,
            hashlib.sha256
        ).hexdigest()
        return signature == f"sha256={expected_signature}"
    
    async def is_replay_attack(self, timestamp: str, event_id: str) -> bool:
        """Prevent replay attacks using timestamp and event ID"""
        event_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
        now = datetime.now()
        
        # Reject events older than 5 minutes
        if now - event_time > timedelta(minutes=5):
            return True
            
        # Check if we've processed this event before
        return event_id in self.processed_events

@app.post("/webhooks/adcp/{user_id}")
async def adcp_webhook(user_id: str, request: Request):
    """Handle AdCP task status updates with reliability patterns."""
    
    # Verify webhook signature
    signature = request.headers.get('x-adcp-signature')
    payload = await request.body()
    
    if not webhook_handler.verify_webhook_signature(payload, signature):
        return {"error": "Invalid signature"}, 401
    
    data = await request.json()
    
    # Prevent replay attacks
    if await webhook_handler.is_replay_attack(data["timestamp"], data["event_id"]):
        return {"status": "ignored", "reason": "replay_attack"}, 200
    
    # Idempotent processing
    event_id = data["event_id"]
    if event_id in webhook_handler.processed_events:
        return {"status": "already_processed"}, 200
    
    try:
        # Record event as processed immediately
        webhook_handler.processed_events[event_id] = {
            "timestamp": data["timestamp"],
            "task_id": data["task_id"]
        }
        
        # Process the webhook
        await process_adcp_webhook(user_id, data)
        
        return {"status": "processed"}, 200
        
    except Exception as e:
        # Log error but still return 200 to prevent retries
        logger.error(f"Webhook processing error: {e}")
        return {"status": "error", "message": str(e)}, 200

async def process_adcp_webhook(user_id: str, data: dict):
    """Process AdCP webhook with sequence handling"""
    task_id = data["task_id"]
    current_status = data["current_status"]
    timestamp = data["timestamp"]
    
    # Find associated operation
    operation = await db.operations.find_one({"task_id": task_id})
    if not operation:
        logger.warning(f"Received webhook for unknown task: {task_id}")
        return
    
    # Check for out-of-order events using timestamps
    operation_timestamp = operation.get("updated_at")
    webhook_timestamp = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
    
    if operation_timestamp and operation_timestamp >= webhook_timestamp:
        logger.info(f"Ignoring out-of-order webhook for task {task_id}")
        return
    
    # Update operation status
    update_data = {
        "status": current_status,
        "updated_at": webhook_timestamp
    }
    
    # Include result or error data
    if current_status == "completed" and "result" in data:
        update_data["result"] = data["result"]
    elif current_status == "failed" and "error" in data:
        update_data["error"] = data["error"]
    
    await tracker.update_status(operation["id"], **update_data)
    
    # Cancel any active polling for this task
    if task_id in handler.polling_tasks:
        handler.polling_tasks[task_id].cancel()
        del handler.polling_tasks[task_id]
    
    # Notify user of status change
    await notifier.notify_status_change(user_id, operation["id"], current_status)

class ReliableWebhookOrchestrator:
    """Orchestrator with webhook reliability patterns"""
    
    def __init__(self):
        self.webhook_timeout = timedelta(minutes=10)  # Max wait for webhook
        self.backup_polling_delay = timedelta(minutes=2)  # Start backup polling
        
    async def _handle_submitted_with_webhook(self, operation_id, task_id, estimated_completion):
        """Handle submitted task with webhook + backup polling"""
        
        # Start backup polling after delay (webhook should arrive first)
        async def backup_polling():
            await asyncio.sleep(self.backup_polling_delay.total_seconds())
            
            # Check if webhook already updated the operation
            operation = await tracker.get_operation(operation_id)
            if operation["status"] not in ["completed", "failed", "canceled"]:
                # Webhook didn't arrive, start polling
                logger.info(f"Starting backup polling for task {task_id}")
                await self._poll_for_completion(operation_id, task_id, interval=60)
        
        # Schedule backup polling
        asyncio.create_task(backup_polling())
        
        # Set webhook timeout
        async def webhook_timeout():
            await asyncio.sleep(self.webhook_timeout.total_seconds())
            
            operation = await tracker.get_operation(operation_id)
            if operation["status"] not in ["completed", "failed", "canceled"]:
                logger.warning(f"Webhook timeout for task {task_id}, falling back to polling")
                await self._poll_for_completion(operation_id, task_id, interval=60)
        
        asyncio.create_task(webhook_timeout())
    
    async def health_check_webhooks(self):
        """Periodic health check for webhook delivery"""
        # Check if recent operations with webhooks completed via webhook vs polling
        recent_ops = await tracker.get_recent_operations_with_webhooks()
        
        webhook_success_rate = calculate_webhook_success_rate(recent_ops)
        
        if webhook_success_rate < 0.8:  # Less than 80% webhook success
            logger.warning(f"Webhook success rate low: {webhook_success_rate:.2%}")
            # Consider disabling webhooks temporarily
            await self.temporarily_disable_webhooks()

Webhook Best Practices for Orchestrators

  1. Always implement polling backup - Never rely solely on webhooks
  2. Use exponential backoff - For backup polling when webhooks fail
  3. Monitor webhook health - Track success rates and disable if needed
  4. Handle duplicates gracefully - Use event IDs for idempotent processing
  5. Implement proper timeouts - Don’t wait forever for webhook delivery
  6. Verify webhook authenticity - Always validate signatures
  7. Log webhook events - Maintain audit trail for debugging

5. User Communication

Keep users informed about pending operations:
class UserNotifier:
    async def notify_pending_approval(self, user_id, operation):
        """Notify user that operation needs approval."""
        message = {
            "type": "pending_approval",
            "operation_id": operation["id"],
            "operation_type": operation["type"],
            "message": "Your media buy requires publisher approval",
            "estimated_time": "2-4 hours",
            "created_at": operation["created_at"]
        }
        await self.send_notification(user_id, message)
    
    async def notify_approval(self, user_id, operation):
        """Notify user of approval."""
        message = {
            "type": "operation_approved",
            "operation_id": operation["id"],
            "message": "Your media buy has been approved and created",
            "media_buy_id": operation["result"]["media_buy_id"]
        }
        await self.send_notification(user_id, message)

Example Orchestrator Flow

class AdCPOrchestrator:
    def __init__(self):
        self.adcp = AdCPClient()  # MCP or A2A client
        self.tracker = OperationTracker(db)
        self.handler = AsyncOperationHandler(self.adcp, self.tracker, UserNotifier())
        self.webhook_base_url = "https://orchestrator.com/webhooks"
    
    async def create_media_buy(self, user_id, request, enable_webhook=True):
        """Create a media buy with full async handling."""
        
        # 1. Prepare webhook configuration
        webhook_config = None
        if enable_webhook:
            webhook_config = {
                "webhook_url": f"{self.webhook_base_url}/adcp/{user_id}",
                "webhook_auth": {
                    "type": "bearer",
                    "credentials": await self.get_webhook_token(user_id)
                }
            }
        
        # 2. Create operation record
        operation_id = await self.tracker.create_operation(
            "create_media_buy",
            request,
            webhook_config=webhook_config
        )
        
        try:
            # 3. Call AdCP with protocol-level webhook config
            if self.adcp.protocol == "mcp":
                response = await self.adcp.call(
                    "create_media_buy", 
                    request,
                    webhook_config
                )
            else:  # A2A
                response = await self.adcp.send({
                    "message": {
                        "parts": [{
                            "kind": "data",
                            "data": {
                                "skill": "create_media_buy",
                                "parameters": request
                            }
                        }]
                    },
                    "push_notification_config": webhook_config
                })
            
            # 4. Handle response with unified status handling
            await self.handler.handle_operation_response(operation_id, response)
            
            # 5. Return appropriate response to user
            status = response["status"]
            
            if status == "completed":
                return {
                    "operation_id": operation_id,
                    "status": "completed",
                    "media_buy_id": response["media_buy_id"],
                    "creative_deadline": response.get("creative_deadline")
                }
            
            elif status == "submitted":
                return {
                    "operation_id": operation_id,
                    "status": "submitted",
                    "task_id": response["task_id"],
                    "message": response["message"],
                    "webhook_configured": webhook_config is not None
                }
            
            elif status == "working":
                return {
                    "operation_id": operation_id,
                    "status": "working",
                    "task_id": response["task_id"],
                    "message": "Processing, will complete within 2 minutes"
                }
            
            elif status == "input_required":
                return {
                    "operation_id": operation_id,
                    "status": "input_required",
                    "message": response["message"],
                    "context_id": response["context_id"]
                }
            
            else:  # failed
                return {
                    "operation_id": operation_id,
                    "status": "failed",
                    "error": response.get("detail")
                }
                
        except Exception as e:
            await self.tracker.update_status(
                operation_id,
                "failed",
                error=str(e)
            )
            raise
    
    async def webhook_handler(self, user_id, payload):
        """Handle webhook notifications from AdCP server"""
        task_id = payload["task_id"]
        
        # Find operation by task_id
        operation = await self.tracker.find_by_task_id(task_id)
        if not operation:
            logger.warning(f"Received webhook for unknown task: {task_id}")
            return
        
        # Update operation with webhook payload
        await self.handler.handle_operation_response(
            operation["id"], 
            payload["result"]
        )
    
    async def reconcile_state_on_startup(self):
        """Recover from orchestrator restart by syncing with server"""
        reconciliation = await self.tracker.reconcile_with_server(self.adcp)
        
        logger.info(f"State reconciliation: {reconciliation}")
        
        # Resume monitoring for orphaned server tasks
        for task_id in reconciliation["orphaned_on_server"]:
            # Create placeholder operation for orphaned task
            operation_id = await self.tracker.create_operation(
                "unknown",  # We don't know the original type
                {},
                status="submitted"
            )
            await self.tracker.update_status(
                operation_id,
                "submitted",
                task_id=task_id
            )
            
            # Start monitoring
            asyncio.create_task(
                self.handler._poll_for_completion(operation_id, task_id)
            )
except Exception as e: await self.tracker.update_status( operation_id, “failed”, error=str(e) ) raise

## Best Practices

### 1. Persistent Storage

Always use persistent storage for operation state:
- Database (MongoDB, PostgreSQL)
- Message queue (Redis, RabbitMQ)
- Distributed cache (Redis Cluster)

### 2. Idempotency

Make all operations idempotent:
```python
async def create_media_buy_idempotent(self, request):
    # Check if already exists
    existing = await self.db.operations.find_one({
        "type": "create_media_buy",
        "request.po_number": request["po_number"],
        "status": {"$in": ["created", "active"]}
    })
    
    if existing:
        return existing["result"]
    
    # Proceed with creation
    return await self.create_media_buy(request)

3. Timeout Handling

Implement reasonable timeouts:
OPERATION_TIMEOUTS = {
    "create_media_buy": timedelta(hours=24),
    "update_media_buy": timedelta(hours=12),
    "creative_approval": timedelta(hours=48)
}

4. Error Recovery

Implement retry logic with circuit breakers:
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(min=1, max=60),
    retry=retry_if_exception_type(TransientError)
)
async def call_adcp_api(self, tool, params):
    try:
        return await self.mcp.call_tool(tool, params)
    except RateLimitError:
        # Back off gracefully
        raise TransientError("Rate limited")
    except NetworkError:
        # Retry network errors
        raise TransientError("Network error")

5. Monitoring and Alerting

Track key metrics:
  • Pending operation count by type
  • Average approval time
  • Rejection rate
  • Task timeout rate
  • API error rate

Testing Considerations

1. Simulate Pending States

Test handling of all pending states:
@pytest.mark.asyncio
async def test_manual_approval_flow():
    orchestrator = AdCPOrchestrator()
    
    # Mock to return pending_manual
    with patch.object(orchestrator.mcp, 'call_tool') as mock:
        mock.return_value = {
            "status": "pending_manual",
            "detail": "Manual approval required. Task ID: task_123"
        }
        
        result = await orchestrator.create_media_buy(
            "user_1", 
            create_request
        )
        
        assert result["status"] == "pending_approval"
        
        # Verify task monitoring started
        assert "task_123" in orchestrator.monitor.monitoring_tasks

2. Test Timeout Scenarios

Ensure proper timeout handling:
async def test_operation_timeout():
    # Create operation that will timeout
    operation_id = await tracker.create_operation(...)
    
    # Fast forward time
    with freeze_time() as frozen:
        frozen.move_to(datetime.now() + timedelta(hours=25))
        
        # Check timeout handler
        await timeout_handler.check_timeouts()
        
        # Verify marked as timed out
        op = await tracker.get_operation(operation_id)
        assert op["status"] == "timed_out"

Conclusion

Building a robust AdCP:Buy orchestrator requires:
  1. Asynchronous design throughout
  2. Proper state management
  3. Graceful handling of pending states
  4. User communication
  5. Monitoring and observability
Remember: Pending states are not errors - they’re a normal part of the advertising workflow.