Skip to main content
All posts
AI & Data11 min read

Building a Multi-Agent Orchestration Platform on Azure Container Apps

Architecture guide for multi-agent AI orchestration on Azure Container Apps — covering KEDA scaling, Dapr state management, Service Bus communication, OpenTelemetry observability, and IaC deployment.

Published

Multi-agent AI systems are moving from research demos to production workloads. The pattern is straightforward: instead of one monolithic LLM call, you decompose a complex task into specialized agents — a planner, a researcher, an executor, a critic — each responsible for a specific capability. The orchestration challenge is where most teams get stuck.

This post presents a production architecture for multi-agent orchestration on Azure Container Apps. We chose Container Apps over Functions, Kubernetes, and custom VMs for specific reasons. We cover the communication patterns, state management, scaling, observability, and infrastructure-as-code to deploy the entire platform.

Why Azure Container Apps for Agents

The requirements for a multi-agent platform are:

  1. Scale-to-zero: Agents should not burn compute when idle
  2. Independent scaling: Each agent type scales based on its own workload
  3. Service-to-service communication: Agents need to call each other reliably
  4. State management: Agents need shared and private state without managing databases
  5. Observability: Distributed traces across agent interactions
  6. Cost control: Pay only for active compute

Azure Container Apps provides all six. Here is the comparison:

RequirementContainer AppsAKSAzure Functions
Scale-to-zeroYes (KEDA)Yes (KEDA, but cluster overhead)Yes
Independent scalingYes (per-app KEDA rules)YesYes
Service-to-serviceDapr built-inManual or Dapr sidecarLimited
State managementDapr state storesManual or Dapr sidecarDurable entities
GPU supportYes (workload profiles)YesNo
Container flexibilityFullFullRuntime constraints
Ops overheadLow (serverless)High (cluster mgmt)Low

Container Apps gives you the container flexibility of Kubernetes, the serverless economics of Functions, and Dapr built in without managing sidecars yourself.

Architecture Overview

Loading diagram...

Agent Types

Orchestrator Agent: Receives incoming tasks, decomposes them into subtasks, assigns to specialist agents, aggregates results. Scales based on incoming request queue depth.

Researcher Agent: Performs RAG queries, web searches, document analysis. Scales based on research task queue depth. May require more memory for large context windows.

Executor Agent: Performs actions — API calls, database writes, code execution in sandboxed environments. Scales based on execution task queue depth. Requires strict permission boundaries.

Critic Agent: Evaluates outputs from other agents against quality criteria. Scales based on evaluation queue depth. Lightweight compute requirements.

Agent Task Flow

Loading diagram...

Agent Communication Patterns

Pattern 1: Event-Driven via Service Bus (Recommended)

Asynchronous, decoupled, resilient. Each agent publishes results to a topic and subscribes to its own task topic.

Python
# agent_base.py — Base class for all agents
from dapr.clients import DaprClient
import json
import uuid
from datetime import datetime

class AgentBase:
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
        self.client = DaprClient()
        self.pubsub_name = "agent-pubsub"  # Dapr component name

    async def publish_task(self, target_agent: str, task: dict):
        """Publish a task for another agent to process."""
        message = {
            "task_id": str(uuid.uuid4()),
            "source_agent": self.agent_type,
            "target_agent": target_agent,
            "payload": task,
            "correlation_id": task.get("correlation_id", str(uuid.uuid4())),
            "timestamp": datetime.utcnow().isoformat(),
        }
        self.client.publish_event(
            pubsub_name=self.pubsub_name,
            topic_name=f"agent.{target_agent}.tasks",
            data=json.dumps(message),
            data_content_type="application/json",
        )

    async def publish_result(self, task_id: str, correlation_id: str,
                              result: dict):
        """Publish the result of a completed task."""
        message = {
            "task_id": task_id,
            "source_agent": self.agent_type,
            "correlation_id": correlation_id,
            "result": result,
            "status": "completed",
            "timestamp": datetime.utcnow().isoformat(),
        }
        self.client.publish_event(
            pubsub_name=self.pubsub_name,
            topic_name="agent.results",
            data=json.dumps(message),
            data_content_type="application/json",
        )

Dapr Pub/Sub component configuration for Service Bus:

YAML
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: agent-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
    - name: connectionString
      secretKeyRef:
        name: servicebus-connection
        key: connectionString
    - name: maxDeliveryCount
      value: "5"
    - name: lockDurationInSec
      value: "60"
    - name: defaultMessageTimeToLiveInSec
      value: "3600"
    - name: maxConcurrentHandlers
      value: "10"

Pattern 2: Direct HTTP via Dapr Service Invocation

For synchronous, low-latency agent-to-agent calls when you need an immediate response.

Python
async def invoke_agent_directly(self, target_agent: str, method: str,
                                 data: dict, timeout: int = 30):
    """Synchronous agent-to-agent call via Dapr service invocation."""
    response = self.client.invoke_method(
        app_id=target_agent,
        method_name=method,
        data=json.dumps(data),
        content_type="application/json",
        http_verb="POST",
        timeout=timeout,
    )
    return json.loads(response.data)

When to Use Which Pattern

ScenarioPatternReason
Task decomposition and assignmentEvent-drivenDecouple orchestrator from worker availability
Critic evaluating executor outputEvent-drivenCritic can batch evaluations
Orchestrator checking agent healthDirect HTTPNeed immediate response
Quick validation before task executionDirect HTTPLatency-sensitive
Fan-out to multiple agentsEvent-drivenParallel processing, no blocking
Final result aggregationEvent-drivenCollect results as they arrive

State Management with Dapr

Agents need state for task tracking, conversation context, and coordination. Dapr state stores abstract the backend.

Python
class AgentStateManager:
    def __init__(self, store_name: str = "agent-statestore"):
        self.client = DaprClient()
        self.store_name = store_name

    async def save_task_state(self, task_id: str, state: dict):
        """Save task state with optimistic concurrency."""
        self.client.save_state(
            store_name=self.store_name,
            key=f"task:{task_id}",
            value=json.dumps(state),
            state_metadata={"contentType": "application/json"},
        )

    async def get_task_state(self, task_id: str) -> dict:
        """Retrieve task state."""
        response = self.client.get_state(
            store_name=self.store_name,
            key=f"task:{task_id}",
        )
        if response.data:
            return json.loads(response.data)
        return {}

    async def save_conversation_context(self, correlation_id: str,
                                         messages: list, metadata: dict):
        """Persist conversation context across agent interactions."""
        state = {
            "messages": messages[-50:],
            "metadata": metadata,
            "updated_at": datetime.utcnow().isoformat(),
        }
        self.client.save_state(
            store_name=self.store_name,
            key=f"context:{correlation_id}",
            value=json.dumps(state),
        )

    async def acquire_task_lock(self, task_id: str, owner: str,
                                 ttl_seconds: int = 30) -> bool:
        """Distributed lock to prevent concurrent task processing."""
        try:
            lock_response = self.client.try_lock(
                store_name="agent-lockstore",
                resource_id=f"task-lock:{task_id}",
                lock_owner=owner,
                expiry_in_seconds=ttl_seconds,
            )
            return lock_response.success
        except Exception:
            return False

State store component for Redis:

YAML
# components/statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: agent-statestore
spec:
  type: state.redis
  version: v1
  metadata:
    - name: redisHost
      value: "agent-redis.redis.cache.windows.net:6380"
    - name: redisPassword
      secretKeyRef:
        name: redis-password
        key: password
    - name: enableTLS
      value: "true"
    - name: actorStateStore
      value: "true"

The Orchestrator Agent: Detailed Implementation

The orchestrator is the central coordinator. It decomposes tasks, assigns work, tracks progress, and aggregates results.

Python
import os
import asyncio
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from openai import AzureOpenAI

app = FastAPI()
dapr_app = DaprApp(app)

class OrchestratorAgent(AgentBase):
    def __init__(self):
        super().__init__("orchestrator")
        self.state = AgentStateManager()
        self.llm = AzureOpenAI(
            azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            api_version="2025-04-01-preview",
        )

    async def decompose_task(self, task: dict) -> list:
        """Use LLM to decompose a complex task into subtasks."""
        response = self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": DECOMPOSITION_PROMPT},
                {"role": "user", "content": json.dumps(task)},
            ],
            response_format={"type": "json_object"},
            temperature=0.1,
        )
        plan = json.loads(response.choices[0].message.content)
        return plan["subtasks"]

    async def handle_new_task(self, task: dict):
        """Main entry point for new tasks."""
        correlation_id = task.get("correlation_id", str(uuid.uuid4()))

        subtasks = await self.decompose_task(task)

        task_state = {
            "correlation_id": correlation_id,
            "status": "in_progress",
            "total_subtasks": len(subtasks),
            "completed_subtasks": 0,
            "results": {},
            "created_at": datetime.utcnow().isoformat(),
        }
        await self.state.save_task_state(correlation_id, task_state)

        for subtask in subtasks:
            target_agent = subtask["assigned_agent"]
            await self.publish_task(target_agent, {
                "correlation_id": correlation_id,
                "subtask_id": subtask["id"],
                "instruction": subtask["instruction"],
                "context": subtask.get("context", {}),
                "dependencies": subtask.get("dependencies", []),
            })

    async def handle_result(self, result: dict):
        """Process a result from a worker agent."""
        correlation_id = result["correlation_id"]

        lock_owner = f"orchestrator-{uuid.uuid4().hex[:8]}"
        if not await self.state.acquire_task_lock(correlation_id, lock_owner):
            await asyncio.sleep(0.5)
            return await self.handle_result(result)

        task_state = await self.state.get_task_state(correlation_id)
        task_state["results"][result["task_id"]] = result["result"]
        task_state["completed_subtasks"] += 1

        if task_state["completed_subtasks"] >= task_state["total_subtasks"]:
            task_state["status"] = "aggregating"
            await self.state.save_task_state(correlation_id, task_state)
            await self._aggregate_and_respond(correlation_id, task_state)
        else:
            await self.state.save_task_state(correlation_id, task_state)

orchestrator = OrchestratorAgent()

@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.orchestrator.tasks")
async def handle_task(event: dict):
    await orchestrator.handle_new_task(event.data)

@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.results")
async def handle_agent_result(event: dict):
    await orchestrator.handle_result(event.data)

Scaling with KEDA

Each agent scales independently based on its workload. KEDA (built into Container Apps) provides custom scaling rules.

YAML
# Container Apps scaling configuration
properties:
  template:
    scale:
      minReplicas: 0
      maxReplicas: 20
      rules:
        - name: servicebus-queue-depth
          custom:
            type: azure-servicebus
            metadata:
              queueName: "agent-researcher-tasks"
              namespace: "agent-servicebus"
              messageCount: "5"
            auth:
              - secretRef: servicebus-connection
                triggerParameter: connection
        - name: cpu-utilization
          custom:
            type: cpu
            metadata:
              type: Utilization
              value: "70"

Scaling strategy per agent type:

AgentMin ReplicasMax ReplicasScale TriggerScale Threshold
Orchestrator15Queue depth10 messages
Researcher020Queue depth5 messages
Executor010Queue depth3 messages
Critic010Queue depth5 messages

Keep the orchestrator at minimum 1 replica for fast response to new tasks. Worker agents scale to zero when idle.

Observability with OpenTelemetry

Distributed tracing across agents is essential for debugging and performance optimization.

Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def setup_telemetry(service_name: str):
    provider = TracerProvider(resource=Resource.create({
        "service.name": service_name,
        "service.namespace": "agent-platform",
    }))
    exporter = OTLPSpanExporter(
        endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT",
                                "http://otel-collector:4317")
    )
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

tracer = setup_telemetry("orchestrator-agent")

# Usage in agent methods
async def decompose_task(self, task: dict):
    with tracer.start_as_current_span("decompose_task") as span:
        span.set_attribute("task.correlation_id", task.get("correlation_id"))
        span.set_attribute("task.type", task.get("type", "unknown"))

        subtasks = await self._call_llm_for_decomposition(task)

        span.set_attribute("subtasks.count", len(subtasks))
        return subtasks

Infrastructure as Code: Bicep Deployment

The complete platform deployed with a single Bicep template.

Bicep
// main.bicep — Multi-agent orchestration platform
param location string = 'westeurope'
param environmentName string = 'agent-platform'

// Container Apps Environment
resource containerEnv 'Microsoft.App/managedEnvironments@2024-03-01' = {
  name: '${environmentName}-env'
  location: location
  properties: {
    daprAIConnectionString: appInsights.properties.ConnectionString
    appLogsConfiguration: {
      destination: 'log-analytics'
      logAnalyticsConfiguration: {
        customerId: logAnalytics.properties.customerId
        sharedKey: logAnalytics.listKeys().primarySharedKey
      }
    }
    workloadProfiles: [
      { name: 'Consumption', workloadProfileType: 'Consumption' }
      {
        name: 'gpu-agents'
        workloadProfileType: 'NC24-A100'
        minimumCount: 0
        maximumCount: 3
      }
    ]
  }
}

// Dapr Pub/Sub Component — Service Bus
resource daprPubsub 'Microsoft.App/managedEnvironments/daprComponents@2024-03-01' = {
  parent: containerEnv
  name: 'agent-pubsub'
  properties: {
    componentType: 'pubsub.azure.servicebus.topics'
    version: 'v1'
    secrets: [
      {
        name: 'sb-connection'
        value: serviceBus.listKeys().primaryConnectionString
      }
    ]
    metadata: [
      { name: 'connectionString', secretRef: 'sb-connection' }
      { name: 'maxDeliveryCount', value: '5' }
    ]
    scopes: ['orchestrator', 'researcher', 'executor', 'critic']
  }
}

// Dapr State Store — Redis
resource daprStateStore 'Microsoft.App/managedEnvironments/daprComponents@2024-03-01' = {
  parent: containerEnv
  name: 'agent-statestore'
  properties: {
    componentType: 'state.redis'
    version: 'v1'
    secrets: [
      { name: 'redis-password', value: redis.listKeys().primaryKey }
    ]
    metadata: [
      { name: 'redisHost', value: '${redis.properties.hostName}:6380' }
      { name: 'redisPassword', secretRef: 'redis-password' }
      { name: 'enableTLS', value: 'true' }
    ]
    scopes: ['orchestrator', 'researcher', 'executor', 'critic']
  }
}

// Orchestrator Agent
resource orchestratorApp 'Microsoft.App/containerApps@2024-03-01' = {
  name: 'orchestrator'
  location: location
  properties: {
    managedEnvironmentId: containerEnv.id
    configuration: {
      dapr: {
        enabled: true
        appId: 'orchestrator'
        appPort: 8000
      }
      ingress: {
        external: true
        targetPort: 8000
        transport: 'http'
      }
      secrets: [
        { name: 'openai-endpoint', value: openAI.properties.endpoint }
      ]
    }
    template: {
      containers: [
        {
          name: 'orchestrator'
          image: '${containerRegistry.properties.loginServer}/agents/orchestrator:latest'
          resources: { cpu: json('1.0'), memory: '2Gi' }
          env: [
            {
              name: 'AZURE_OPENAI_ENDPOINT'
              secretRef: 'openai-endpoint'
            }
            {
              name: 'OTEL_EXPORTER_OTLP_ENDPOINT'
              value: 'http://otel-collector:4317'
            }
          ]
        }
      ]
      scale: {
        minReplicas: 1
        maxReplicas: 5
        rules: [
          {
            name: 'queue-depth'
            custom: {
              type: 'azure-servicebus'
              metadata: {
                topicName: 'agent.orchestrator.tasks'
                subscriptionName: 'orchestrator'
                messageCount: '10'
              }
              auth: [
                {
                  secretRef: 'sb-connection'
                  triggerParameter: 'connection'
                }
              ]
            }
          }
        ]
      }
    }
  }
}

Deployment Pipeline

YAML
# .github/workflows/deploy-agents.yml
name: Deploy Agent Platform
on:
  push:
    branches: [main]
    paths: ['agents/**', 'infra/**']

jobs:
  deploy-infra:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: azure/login@v2
        with:
          creds: ${{ secrets.AZURE_CREDENTIALS }}
      - uses: azure/arm-deploy@v2
        with:
          resourceGroupName: rg-agent-platform
          template: ./infra/main.bicep
          parameters: environmentName=agent-platform location=westeurope

  build-and-deploy-agents:
    needs: deploy-infra
    strategy:
      matrix:
        agent: [orchestrator, researcher, executor, critic]
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: azure/docker-login@v2
        with:
          login-server: ${{ secrets.ACR_LOGIN_SERVER }}
          username: ${{ secrets.ACR_USERNAME }}
          password: ${{ secrets.ACR_PASSWORD }}
      - run: |
          docker build -t ${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }} \
            ./agents/${{ matrix.agent }}
          docker push ${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }}
      - uses: azure/container-apps-deploy-action@v2
        with:
          containerAppName: ${{ matrix.agent }}
          resourceGroup: rg-agent-platform
          imageToDeploy: >-
            ${{ secrets.ACR_LOGIN_SERVER }}/agents/${{ matrix.agent }}:${{ github.sha }}

Production Considerations

Idempotency: Every agent operation must be idempotent. Service Bus may deliver messages more than once. Use the task ID as an idempotency key in the state store.

Timeout handling: Set realistic timeouts for LLM calls (30-60 seconds for GPT-4o). Implement circuit breakers for cascading failures when Azure OpenAI has latency spikes.

Cost control: Set max replicas conservatively. Use Consumption workload profiles for CPU-only agents. Reserve GPU profiles only for agents that need embedding generation or local model inference.

Security: Each agent should have its own managed identity with least-privilege access. The executor agent needs the most restricted permissions — never give it broad Azure RBAC roles.


CC Conceptualise designs and deploys multi-agent orchestration platforms on Azure Container Apps — from architecture through production operations. If you are building AI agent systems that need enterprise reliability, contact us at mbrahim@conceptualise.de.

Topics

multi-agent orchestrationAzure Container Apps agentsDapr state managementKEDA autoscaling AIagent communication patterns

Frequently Asked Questions

Azure Functions excels at single-purpose, event-triggered workloads. Multi-agent systems need long-running processes, inter-agent communication, shared state, and fine-grained resource control. Container Apps provides scale-to-zero like Functions, but adds Dapr for service-to-service communication and state management, KEDA for custom scaling rules, and full container flexibility for GPU workloads or specialized runtimes.

Expert engagement

Need expert guidance?

Our team specializes in cloud architecture, security, AI platforms, and DevSecOps. Let's discuss how we can help your organization.

Get in touchNo commitment · No sales pressure

Related articles

All posts