Building a Multi-Agent Orchestration Platform on Azure Container Apps
Architecture guide for multi-agent AI orchestration on Azure Container Apps — covering KEDA scaling, Dapr state management, Service Bus communication, OpenTelemetry observability, and IaC deployment.
Multi-agent AI systems are moving from research demos to production workloads. The pattern is straightforward: instead of one monolithic LLM call, you decompose a complex task into specialized agents — a planner, a researcher, an executor, a critic — each responsible for a specific capability. The orchestration challenge is where most teams get stuck.
This post presents a production architecture for multi-agent orchestration on Azure Container Apps. We chose Container Apps over Functions, Kubernetes, and custom VMs for specific reasons. We cover the communication patterns, state management, scaling, observability, and infrastructure-as-code to deploy the entire platform.
Why Azure Container Apps for Agents
The requirements for a multi-agent platform are:
- Scale-to-zero: Agents should not burn compute when idle
- Independent scaling: Each agent type scales based on its own workload
- Service-to-service communication: Agents need to call each other reliably
- State management: Agents need shared and private state without managing databases
- Observability: Distributed traces across agent interactions
- Cost control: Pay only for active compute
Azure Container Apps provides all six. Here is the comparison:
| Requirement | Container Apps | AKS | Azure Functions |
|---|---|---|---|
| Scale-to-zero | Yes (KEDA) | Yes (KEDA, but cluster overhead) | Yes |
| Independent scaling | Yes (per-app KEDA rules) | Yes | Yes |
| Service-to-service | Dapr built-in | Manual or Dapr sidecar | Limited |
| State management | Dapr state stores | Manual or Dapr sidecar | Durable entities |
| GPU support | Yes (workload profiles) | Yes | No |
| Container flexibility | Full | Full | Runtime constraints |
| Ops overhead | Low (serverless) | High (cluster mgmt) | Low |
Container Apps gives you the container flexibility of Kubernetes, the serverless economics of Functions, and Dapr built in without managing sidecars yourself.
Architecture Overview
Agent Types
Orchestrator Agent: Receives incoming tasks, decomposes them into subtasks, assigns to specialist agents, aggregates results. Scales based on incoming request queue depth.
Researcher Agent: Performs RAG queries, web searches, document analysis. Scales based on research task queue depth. May require more memory for large context windows.
Executor Agent: Performs actions — API calls, database writes, code execution in sandboxed environments. Scales based on execution task queue depth. Requires strict permission boundaries.
Critic Agent: Evaluates outputs from other agents against quality criteria. Scales based on evaluation queue depth. Lightweight compute requirements.
Agent Task Flow
Agent Communication Patterns
Pattern 1: Event-Driven via Service Bus (Recommended)
Asynchronous, decoupled, resilient. Each agent publishes results to a topic and subscribes to its own task topic.
# agent_base.py — Base class for all agents
from dapr.clients import DaprClient
import json
import uuid
from datetime import datetime
class AgentBase:
def __init__(self, agent_type: str):
self.agent_type = agent_type
self.client = DaprClient()
self.pubsub_name = "agent-pubsub" # Dapr component name
async def publish_task(self, target_agent: str, task: dict):
"""Publish a task for another agent to process."""
message = {
"task_id": str(uuid.uuid4()),
"source_agent": self.agent_type,
"target_agent": target_agent,
"payload": task,
"correlation_id": task.get("correlation_id", str(uuid.uuid4())),
"timestamp": datetime.utcnow().isoformat(),
}
self.client.publish_event(
pubsub_name=self.pubsub_name,
topic_name=f"agent.{target_agent}.tasks",
data=json.dumps(message),
data_content_type="application/json",
)
async def publish_result(self, task_id: str, correlation_id: str,
result: dict):
"""Publish the result of a completed task."""
message = {
"task_id": task_id,
"source_agent": self.agent_type,
"correlation_id": correlation_id,
"result": result,
"status": "completed",
"timestamp": datetime.utcnow().isoformat(),
}
self.client.publish_event(
pubsub_name=self.pubsub_name,
topic_name="agent.results",
data=json.dumps(message),
data_content_type="application/json",
)Dapr Pub/Sub component configuration for Service Bus:
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: agent-pubsub
spec:
type: pubsub.azure.servicebus.topics
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: servicebus-connection
key: connectionString
- name: maxDeliveryCount
value: "5"
- name: lockDurationInSec
value: "60"
- name: defaultMessageTimeToLiveInSec
value: "3600"
- name: maxConcurrentHandlers
value: "10"Pattern 2: Direct HTTP via Dapr Service Invocation
For synchronous, low-latency agent-to-agent calls when you need an immediate response.
async def invoke_agent_directly(self, target_agent: str, method: str,
data: dict, timeout: int = 30):
"""Synchronous agent-to-agent call via Dapr service invocation."""
response = self.client.invoke_method(
app_id=target_agent,
method_name=method,
data=json.dumps(data),
content_type="application/json",
http_verb="POST",
timeout=timeout,
)
return json.loads(response.data)When to Use Which Pattern
| Scenario | Pattern | Reason |
|---|---|---|
| Task decomposition and assignment | Event-driven | Decouple orchestrator from worker availability |
| Critic evaluating executor output | Event-driven | Critic can batch evaluations |
| Orchestrator checking agent health | Direct HTTP | Need immediate response |
| Quick validation before task execution | Direct HTTP | Latency-sensitive |
| Fan-out to multiple agents | Event-driven | Parallel processing, no blocking |
| Final result aggregation | Event-driven | Collect results as they arrive |
State Management with Dapr
Agents need state for task tracking, conversation context, and coordination. Dapr state stores abstract the backend.
class AgentStateManager:
def __init__(self, store_name: str = "agent-statestore"):
self.client = DaprClient()
self.store_name = store_name
async def save_task_state(self, task_id: str, state: dict):
"""Save task state with optimistic concurrency."""
self.client.save_state(
store_name=self.store_name,
key=f"task:{task_id}",
value=json.dumps(state),
state_metadata={"contentType": "application/json"},
)
async def get_task_state(self, task_id: str) -> dict:
"""Retrieve task state."""
response = self.client.get_state(
store_name=self.store_name,
key=f"task:{task_id}",
)
if response.data:
return json.loads(response.data)
return {}
async def save_conversation_context(self, correlation_id: str,
messages: list, metadata: dict):
"""Persist conversation context across agent interactions."""
state = {
"messages": messages[-50:],
"metadata": metadata,
"updated_at": datetime.utcnow().isoformat(),
}
self.client.save_state(
store_name=self.store_name,
key=f"context:{correlation_id}",
value=json.dumps(state),
)
async def acquire_task_lock(self, task_id: str, owner: str,
ttl_seconds: int = 30) -> bool:
"""Distributed lock to prevent concurrent task processing."""
try:
lock_response = self.client.try_lock(
store_name="agent-lockstore",
resource_id=f"task-lock:{task_id}",
lock_owner=owner,
expiry_in_seconds=ttl_seconds,
)
return lock_response.success
except Exception:
return FalseState store component for Redis:
# components/statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: agent-statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: "agent-redis.redis.cache.windows.net:6380"
- name: redisPassword
secretKeyRef:
name: redis-password
key: password
- name: enableTLS
value: "true"
- name: actorStateStore
value: "true"The Orchestrator Agent: Detailed Implementation
The orchestrator is the central coordinator. It decomposes tasks, assigns work, tracks progress, and aggregates results.
import os
import asyncio
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from openai import AzureOpenAI
app = FastAPI()
dapr_app = DaprApp(app)
class OrchestratorAgent(AgentBase):
def __init__(self):
super().__init__("orchestrator")
self.state = AgentStateManager()
self.llm = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2025-04-01-preview",
)
async def decompose_task(self, task: dict) -> list:
"""Use LLM to decompose a complex task into subtasks."""
response = self.llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": DECOMPOSITION_PROMPT},
{"role": "user", "content": json.dumps(task)},
],
response_format={"type": "json_object"},
temperature=0.1,
)
plan = json.loads(response.choices[0].message.content)
return plan["subtasks"]
async def handle_new_task(self, task: dict):
"""Main entry point for new tasks."""
correlation_id = task.get("correlation_id", str(uuid.uuid4()))
subtasks = await self.decompose_task(task)
task_state = {
"correlation_id": correlation_id,
"status": "in_progress",
"total_subtasks": len(subtasks),
"completed_subtasks": 0,
"results": {},
"created_at": datetime.utcnow().isoformat(),
}
await self.state.save_task_state(correlation_id, task_state)
for subtask in subtasks:
target_agent = subtask["assigned_agent"]
await self.publish_task(target_agent, {
"correlation_id": correlation_id,
"subtask_id": subtask["id"],
"instruction": subtask["instruction"],
"context": subtask.get("context", {}),
"dependencies": subtask.get("dependencies", []),
})
async def handle_result(self, result: dict):
"""Process a result from a worker agent."""
correlation_id = result["correlation_id"]
lock_owner = f"orchestrator-{uuid.uuid4().hex[:8]}"
if not await self.state.acquire_task_lock(correlation_id, lock_owner):
await asyncio.sleep(0.5)
return await self.handle_result(result)
task_state = await self.state.get_task_state(correlation_id)
task_state["results"][result["task_id"]] = result["result"]
task_state["completed_subtasks"] += 1
if task_state["completed_subtasks"] >= task_state["total_subtasks"]:
task_state["status"] = "aggregating"
await self.state.save_task_state(correlation_id, task_state)
await self._aggregate_and_respond(correlation_id, task_state)
else:
await self.state.save_task_state(correlation_id, task_state)
orchestrator = OrchestratorAgent()
@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.orchestrator.tasks")
async def handle_task(event: dict):
await orchestrator.handle_new_task(event.data)
@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.results")
async def handle_agent_result(event: dict):
await orchestrator.handle_result(event.data)Scaling with KEDA
Each agent scales independently based on its workload. KEDA (built into Container Apps) provides custom scaling rules.
# Container Apps scaling configuration
properties:
template:
scale:
minReplicas: 0
maxReplicas: 20
rules:
- name: servicebus-queue-depth
custom:
type: azure-servicebus
metadata:
queueName: "agent-researcher-tasks"
namespace: "agent-servicebus"
messageCount: "5"
auth:
- secretRef: servicebus-connection
triggerParameter: connection
- name: cpu-utilization
custom:
type: cpu
metadata:
type: Utilization
value: "70"Scaling strategy per agent type:
| Agent | Min Replicas | Max Replicas | Scale Trigger | Scale Threshold |
|---|---|---|---|---|
| Orchestrator | 1 | 5 | Queue depth | 10 messages |
| Researcher | 0 | 20 | Queue depth | 5 messages |
| Executor | 0 | 10 | Queue depth | 3 messages |
| Critic | 0 | 10 | Queue depth | 5 messages |
Keep the orchestrator at minimum 1 replica for fast response to new tasks. Worker agents scale to zero when idle.
Observability with OpenTelemetry
Distributed tracing across agents is essential for debugging and performance optimization.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_telemetry(service_name: str):
provider = TracerProvider(resource=Resource.create({
"service.name": service_name,
"service.namespace": "agent-platform",
}))
exporter = OTLPSpanExporter(
endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT",
"http://otel-collector:4317")
)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
tracer = setup_telemetry("orchestrator-agent")
# Usage in agent methods
async def decompose_task(self, task: dict):
with tracer.start_as_current_span("decompose_task") as span:
span.set_attribute("task.correlation_id", task.get("correlation_id"))
span.set_attribute("task.type", task.get("type", "unknown"))
subtasks = await self._call_llm_for_decomposition(task)
span.set_attribute("subtasks.count", len(subtasks))
return subtasksInfrastructure as Code: Bicep Deployment
The complete platform deployed with a single Bicep template.
// main.bicep — Multi-agent orchestration platform
param location string = 'westeurope'
param environmentName string = 'agent-platform'
// Container Apps Environment
resource containerEnv 'Microsoft.App/managedEnvironments@2024-03-01' = {
name: '${environmentName}-env'
location: location
properties: {
daprAIConnectionString: appInsights.properties.ConnectionString
appLogsConfiguration: {
destination: 'log-analytics'
logAnalyticsConfiguration: {
customerId: logAnalytics.properties.customerId
sharedKey: logAnalytics.listKeys().primarySharedKey
}
}
workloadProfiles: [
{ name: 'Consumption', workloadProfileType: 'Consumption' }
{
name: 'gpu-agents'
workloadProfileType: 'NC24-A100'
minimumCount: 0
maximumCount: 3
}
]
}
}
// Dapr Pub/Sub Component — Service Bus
resource daprPubsub 'Microsoft.App/managedEnvironments/daprComponents@2024-03-01' = {
parent: containerEnv
name: 'agent-pubsub'
properties: {
componentType: 'pubsub.azure.servicebus.topics'
version: 'v1'
secrets: [
{
name: 'sb-connection'
value: serviceBus.listKeys().primaryConnectionString
}
]
metadata: [
{ name: 'connectionString', secretRef: 'sb-connection' }
{ name: 'maxDeliveryCount', value: '5' }
]
scopes: ['orchestrator', 'researcher', 'executor', 'critic']
}
}
// Dapr State Store — Redis
resource daprStateStore 'Microsoft.App/managedEnvironments/daprComponents@2024-03-01' = {
parent: containerEnv
name: 'agent-statestore'
properties: {
componentType: 'state.redis'
version: 'v1'
secrets: [
{ name: 'redis-password', value: redis.listKeys().primaryKey }
]
metadata: [
{ name: 'redisHost', value: '${redis.properties.hostName}:6380' }
{ name: 'redisPassword', secretRef: 'redis-password' }
{ name: 'enableTLS', value: 'true' }
]
scopes: ['orchestrator', 'researcher', 'executor', 'critic']
}
}
// Orchestrator Agent
resource orchestratorApp 'Microsoft.App/containerApps@2024-03-01' = {
name: 'orchestrator'
location: location
properties: {
managedEnvironmentId: containerEnv.id
configuration: {
dapr: {
enabled: true
appId: 'orchestrator'
appPort: 8000
}
ingress: {
external: true
targetPort: 8000
transport: 'http'
}
secrets: [
{ name: 'openai-endpoint', value: openAI.properties.endpoint }
]
}
template: {
containers: [
{
name: 'orchestrator'
image: '${containerRegistry.properties.loginServer}/agents/orchestrator:latest'
resources: { cpu: json('1.0'), memory: '2Gi' }
env: [
{
name: 'AZURE_OPENAI_ENDPOINT'
secretRef: 'openai-endpoint'
}
{
name: 'OTEL_EXPORTER_OTLP_ENDPOINT'
value: 'http://otel-collector:4317'
}
]
}
]
scale: {
minReplicas: 1
maxReplicas: 5
rules: [
{
name: 'queue-depth'
custom: {
type: 'azure-servicebus'
metadata: {
topicName: 'agent.orchestrator.tasks'
subscriptionName: 'orchestrator'
messageCount: '10'
}
auth: [
{
secretRef: 'sb-connection'
triggerParameter: 'connection'
}
]
}
}
]
}
}
}
}Deployment Pipeline
# .github/workflows/deploy-agents.yml
name: Deploy Agent Platform
on:
push:
branches: [main]
paths: ['agents/**', 'infra/**']
jobs:
deploy-infra:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: azure/login@v2
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- uses: azure/arm-deploy@v2
with:
resourceGroupName: rg-agent-platform
template: ./infra/main.bicep
parameters: environmentName=agent-platform location=westeurope
build-and-deploy-agents:
needs: deploy-infra
strategy:
matrix:
agent: [orchestrator, researcher, executor, critic]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: azure/docker-login@v2
with:
login-server: ${{ secrets.ACR_LOGIN_SERVER }}
username: ${{ secrets.ACR_USERNAME }}
password: ${{ secrets.ACR_PASSWORD }}
- run: |
docker build -t ${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }} \
./agents/${{ matrix.agent }}
docker push ${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }}
- uses: azure/container-apps-deploy-action@v2
with:
containerAppName: ${{ matrix.agent }}
resourceGroup: rg-agent-platform
imageToDeploy: >-
${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }}Production Considerations
Idempotency: Every agent operation must be idempotent. Service Bus may deliver messages more than once. Use the task ID as an idempotency key in the state store.
Timeout handling: Set realistic timeouts for LLM calls (30-60 seconds for GPT-4o). Implement circuit breakers for cascading failures when Azure OpenAI has latency spikes.
Cost control: Set max replicas conservatively. Use Consumption workload profiles for CPU-only agents. Reserve GPU profiles only for agents that need embedding generation or local model inference.
Security: Each agent should have its own managed identity with least-privilege access. The executor agent needs the most restricted permissions — never give it broad Azure RBAC roles.
CC Conceptualise designs and deploys multi-agent orchestration platforms on Azure Container Apps — from architecture through production operations. If you are building AI agent systems that need enterprise reliability, contact us at mbrahim@conceptualise.de.
Topics