Multi-Agent-Orchestrierungsplattform auf Azure Container Apps aufbauen
Architekturleitfaden für Multi-Agent-KI-Orchestrierung auf Azure Container Apps — KEDA-Skalierung, Dapr State Management, Service-Bus-Kommunikation, OpenTelemetry-Observability und IaC-Deployment.
Multi-Agent-KI-Systeme bewegen sich von Forschungsdemos zu Produktions-Workloads. Das Muster ist einfach: Statt eines monolithischen LLM-Aufrufs zerlegen Sie eine komplexe Aufgabe in spezialisierte Agenten — einen Planner, einen Researcher, einen Executor, einen Critic — jeder verantwortlich für eine spezifische Fähigkeit. Die Orchestrierungsherausforderung ist der Punkt, an dem die meisten Teams stecken bleiben.
Dieser Beitrag präsentiert eine Produktionsarchitektur für Multi-Agent-Orchestrierung auf Azure Container Apps. Wir haben Container Apps gegenüber Functions, Kubernetes und eigenen VMs aus spezifischen Gründen gewählt. Wir behandeln Kommunikationsmuster, State Management, Skalierung, Observability und Infrastructure-as-Code zum Deployment der gesamten Plattform.
Warum Azure Container Apps für Agenten
Die Anforderungen an eine Multi-Agent-Plattform sind:
- Scale-to-Zero: Agenten sollten kein Compute verbrennen, wenn sie untätig sind
- Unabhängige Skalierung: Jeder Agent-Typ skaliert basierend auf seinem eigenen Workload
- Service-to-Service-Kommunikation: Agenten müssen sich zuverlässig gegenseitig aufrufen können
- State Management: Agenten brauchen gemeinsamen und privaten State ohne Datenbankverwaltung
- Observability: Verteilte Traces über Agent-Interaktionen
- Kostenkontrolle: Nur für aktives Compute bezahlen
Azure Container Apps bietet alle sechs. Hier der Vergleich:
| Anforderung | Container Apps | AKS | Azure Functions |
|---|---|---|---|
| Scale-to-Zero | Ja (KEDA) | Ja (KEDA, aber Cluster-Overhead) | Ja |
| Unabhängige Skalierung | Ja (per-App KEDA Rules) | Ja | Ja |
| Service-to-Service | Dapr integriert | Manuell oder Dapr Sidecar | Eingeschränkt |
| State Management | Dapr State Stores | Manuell oder Dapr Sidecar | Durable Entities |
| GPU-Unterstützung | Ja (Workload Profiles) | Ja | Nein |
| Container-Flexibilität | Voll | Voll | Runtime-Einschränkungen |
| Ops-Overhead | Niedrig (Serverless) | Hoch (Cluster-Mgmt) | Niedrig |
Container Apps gibt Ihnen die Container-Flexibilität von Kubernetes, die Serverless-Ökonomie von Functions und Dapr integriert, ohne Sidecars selbst verwalten zu müssen.
Architekturübersicht
Aufgabenfluss
Agent-Typen
Orchestrator Agent: Empfängt eingehende Aufgaben, zerlegt sie in Teilaufgaben, weist sie Spezialagenten zu, aggregiert Ergebnisse. Skaliert basierend auf der Tiefe der eingehenden Request-Queue.
Researcher Agent: Führt RAG-Abfragen, Web-Recherchen, Dokumentenanalysen durch. Skaliert basierend auf der Research-Task-Queue-Tiefe. Benötigt möglicherweise mehr Speicher für große Kontextfenster.
Executor Agent: Führt Aktionen aus — API-Aufrufe, Datenbankschreibvorgänge, Code-Ausführung in Sandbox-Umgebungen. Skaliert basierend auf der Execution-Task-Queue-Tiefe. Erfordert strenge Berechtigungsgrenzen.
Critic Agent: Evaluiert Outputs anderer Agenten gegen Qualitätskriterien. Skaliert basierend auf der Evaluierungs-Queue-Tiefe. Leichtgewichtige Compute-Anforderungen.
Agent-Kommunikationsmuster
Muster 1: Ereignisgesteuert via Service Bus (Empfohlen)
Asynchron, entkoppelt, resilient. Jeder Agent veröffentlicht Ergebnisse zu einem Topic und abonniert sein eigenes Aufgaben-Topic.
# agent_base.py — Basisklasse für alle Agenten
from dapr.clients import DaprClient
import json
import uuid
from datetime import datetime
class AgentBase:
def __init__(self, agent_type: str):
self.agent_type = agent_type
self.client = DaprClient()
self.pubsub_name = "agent-pubsub"
async def publish_task(self, target_agent: str, task: dict):
"""Veröffentlicht eine Aufgabe zur Verarbeitung durch einen anderen Agenten."""
message = {
"task_id": str(uuid.uuid4()),
"source_agent": self.agent_type,
"target_agent": target_agent,
"payload": task,
"correlation_id": task.get("correlation_id", str(uuid.uuid4())),
"timestamp": datetime.utcnow().isoformat(),
}
self.client.publish_event(
pubsub_name=self.pubsub_name,
topic_name=f"agent.{target_agent}.tasks",
data=json.dumps(message),
data_content_type="application/json",
)
async def publish_result(self, task_id: str, correlation_id: str,
result: dict):
"""Veröffentlicht das Ergebnis einer abgeschlossenen Aufgabe."""
message = {
"task_id": task_id,
"source_agent": self.agent_type,
"correlation_id": correlation_id,
"result": result,
"status": "completed",
"timestamp": datetime.utcnow().isoformat(),
}
self.client.publish_event(
pubsub_name=self.pubsub_name,
topic_name="agent.results",
data=json.dumps(message),
data_content_type="application/json",
)Dapr-Pub/Sub-Komponentenkonfiguration für Service Bus:
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: agent-pubsub
spec:
type: pubsub.azure.servicebus.topics
version: v1
metadata:
- name: connectionString
secretKeyRef:
name: servicebus-connection
key: connectionString
- name: maxDeliveryCount
value: "5"
- name: lockDurationInSec
value: "60"
- name: maxConcurrentHandlers
value: "10"Muster 2: Direktes HTTP via Dapr Service Invocation
Für synchrone, latenzarme Agent-zu-Agent-Aufrufe, wenn eine sofortige Antwort benötigt wird.
async def invoke_agent_directly(self, target_agent: str, method: str,
data: dict, timeout: int = 30):
"""Synchroner Agent-zu-Agent-Aufruf via Dapr Service Invocation."""
response = self.client.invoke_method(
app_id=target_agent,
method_name=method,
data=json.dumps(data),
content_type="application/json",
http_verb="POST",
timeout=timeout,
)
return json.loads(response.data)Wann welches Muster verwenden
| Szenario | Muster | Begründung |
|---|---|---|
| Aufgabenzerlegung und -zuweisung | Ereignisgesteuert | Orchestrator von Worker-Verfügbarkeit entkoppeln |
| Critic evaluiert Executor-Output | Ereignisgesteuert | Critic kann Evaluierungen bündeln |
| Orchestrator prüft Agent-Health | Direktes HTTP | Sofortige Antwort nötig |
| Schnelle Validierung vor Aufgabenausführung | Direktes HTTP | Latenzsensitiv |
| Fan-Out an mehrere Agenten | Ereignisgesteuert | Parallele Verarbeitung, kein Blockieren |
State Management mit Dapr
Agenten brauchen State für Aufgabenverfolgung, Konversationskontext und Koordination.
class AgentStateManager:
def __init__(self, store_name: str = "agent-statestore"):
self.client = DaprClient()
self.store_name = store_name
async def save_task_state(self, task_id: str, state: dict):
"""Speichert Aufgaben-State mit optimistischer Nebenläufigkeit."""
self.client.save_state(
store_name=self.store_name,
key=f"task:{task_id}",
value=json.dumps(state),
state_metadata={"contentType": "application/json"},
)
async def get_task_state(self, task_id: str) -> dict:
"""Ruft Aufgaben-State ab."""
response = self.client.get_state(
store_name=self.store_name,
key=f"task:{task_id}",
)
if response.data:
return json.loads(response.data)
return {}
async def acquire_task_lock(self, task_id: str, owner: str,
ttl_seconds: int = 30) -> bool:
"""Distributed Lock gegen gleichzeitige Aufgabenverarbeitung."""
try:
lock_response = self.client.try_lock(
store_name="agent-lockstore",
resource_id=f"task-lock:{task_id}",
lock_owner=owner,
expiry_in_seconds=ttl_seconds,
)
return lock_response.success
except Exception:
return FalseDer Orchestrator Agent: Detaillierte Implementierung
Der Orchestrator ist der zentrale Koordinator. Er zerlegt Aufgaben, weist Arbeit zu, verfolgt den Fortschritt und aggregiert Ergebnisse.
import os
import asyncio
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from openai import AzureOpenAI
app = FastAPI()
dapr_app = DaprApp(app)
class OrchestratorAgent(AgentBase):
def __init__(self):
super().__init__("orchestrator")
self.state = AgentStateManager()
self.llm = AzureOpenAI(
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2025-04-01-preview",
)
async def decompose_task(self, task: dict) -> list:
"""Nutzt LLM zur Zerlegung einer komplexen Aufgabe in Teilaufgaben."""
response = self.llm.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": DECOMPOSITION_PROMPT},
{"role": "user", "content": json.dumps(task)},
],
response_format={"type": "json_object"},
temperature=0.1,
)
plan = json.loads(response.choices[0].message.content)
return plan["subtasks"]
async def handle_new_task(self, task: dict):
"""Haupteinstiegspunkt für neue Aufgaben."""
correlation_id = task.get("correlation_id", str(uuid.uuid4()))
subtasks = await self.decompose_task(task)
task_state = {
"correlation_id": correlation_id,
"status": "in_progress",
"total_subtasks": len(subtasks),
"completed_subtasks": 0,
"results": {},
}
await self.state.save_task_state(correlation_id, task_state)
for subtask in subtasks:
target_agent = subtask["assigned_agent"]
await self.publish_task(target_agent, {
"correlation_id": correlation_id,
"subtask_id": subtask["id"],
"instruction": subtask["instruction"],
})
orchestrator = OrchestratorAgent()
@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.orchestrator.tasks")
async def handle_task(event: dict):
await orchestrator.handle_new_task(event.data)
@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.results")
async def handle_agent_result(event: dict):
await orchestrator.handle_result(event.data)Skalierung mit KEDA
Jeder Agent skaliert unabhängig basierend auf seinem Workload. KEDA (in Container Apps integriert) bietet benutzerdefinierte Skalierungsregeln.
Skalierungsstrategie pro Agent-Typ:
| Agent | Min Replicas | Max Replicas | Skalierungs-Trigger | Schwellenwert |
|---|---|---|---|---|
| Orchestrator | 1 | 5 | Queue-Tiefe | 10 Nachrichten |
| Researcher | 0 | 20 | Queue-Tiefe | 5 Nachrichten |
| Executor | 0 | 10 | Queue-Tiefe | 3 Nachrichten |
| Critic | 0 | 10 | Queue-Tiefe | 5 Nachrichten |
Halten Sie den Orchestrator auf mindestens 1 Replica für schnelle Reaktion auf neue Aufgaben. Worker-Agenten skalieren auf Null, wenn sie untätig sind.
Observability mit OpenTelemetry
Verteiltes Tracing über Agenten hinweg ist essenziell für Debugging und Performance-Optimierung.
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
def setup_telemetry(service_name: str):
provider = TracerProvider(resource=Resource.create({
"service.name": service_name,
"service.namespace": "agent-platform",
}))
exporter = OTLPSpanExporter(
endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT",
"http://otel-collector:4317")
)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
tracer = setup_telemetry("orchestrator-agent")Infrastructure as Code: Bicep-Deployment
Die komplette Plattform mit einem einzelnen Bicep-Template deployed.
// main.bicep — Multi-Agent-Orchestrierungsplattform
param location string = 'westeurope'
param environmentName string = 'agent-platform'
resource containerEnv 'Microsoft.App/managedEnvironments@2024-03-01' = {
name: '${environmentName}-env'
location: location
properties: {
daprAIConnectionString: appInsights.properties.ConnectionString
appLogsConfiguration: {
destination: 'log-analytics'
logAnalyticsConfiguration: {
customerId: logAnalytics.properties.customerId
sharedKey: logAnalytics.listKeys().primarySharedKey
}
}
workloadProfiles: [
{ name: 'Consumption', workloadProfileType: 'Consumption' }
{
name: 'gpu-agents'
workloadProfileType: 'NC24-A100'
minimumCount: 0
maximumCount: 3
}
]
}
}
resource orchestratorApp 'Microsoft.App/containerApps@2024-03-01' = {
name: 'orchestrator'
location: location
properties: {
managedEnvironmentId: containerEnv.id
configuration: {
dapr: { enabled: true, appId: 'orchestrator', appPort: 8000 }
ingress: { external: true, targetPort: 8000, transport: 'http' }
}
template: {
containers: [
{
name: 'orchestrator'
image: '${containerRegistry.properties.loginServer}/agents/orchestrator:latest'
resources: { cpu: json('1.0'), memory: '2Gi' }
}
]
scale: {
minReplicas: 1
maxReplicas: 5
}
}
}
}Produktionsüberlegungen
Idempotenz: Jede Agent-Operation muss idempotent sein. Service Bus kann Nachrichten mehrfach zustellen. Verwenden Sie die Task-ID als Idempotency Key im State Store.
Timeout-Behandlung: Setzen Sie realistische Timeouts für LLM-Aufrufe (30-60 Sekunden für GPT-4o). Implementieren Sie Circuit Breaker für kaskadierende Ausfälle bei Azure-OpenAI-Latenzspitzen.
Kostenkontrolle: Setzen Sie Max Replicas konservativ. Verwenden Sie Consumption Workload Profiles für CPU-Only-Agenten. Reservieren Sie GPU-Profile nur für Agenten, die Embedding-Generierung oder lokale Modell-Inferenz benötigen.
Sicherheit: Jeder Agent sollte seine eigene Managed Identity mit Least-Privilege-Zugriff haben. Der Executor Agent braucht die restriktivsten Berechtigungen — geben Sie ihm niemals breite Azure-RBAC-Rollen.
CC Conceptualise entwirft und betreibt Multi-Agent-Orchestrierungsplattformen auf Azure Container Apps — von der Architektur bis zum Produktionsbetrieb. Wenn Sie KI-Agent-Systeme mit Enterprise-Zuverlässigkeit aufbauen, kontaktieren Sie uns unter mbrahim@conceptualise.de.
Themen