Zum Hauptinhalt springen
Alle Beiträge
KI & Daten8 Min. Lesezeit

Multi-Agent-Orchestrierungsplattform auf Azure Container Apps aufbauen

Architekturleitfaden für Multi-Agent-KI-Orchestrierung auf Azure Container Apps — KEDA-Skalierung, Dapr State Management, Service-Bus-Kommunikation, OpenTelemetry-Observability und IaC-Deployment.

Veröffentlicht

Multi-Agent-KI-Systeme bewegen sich von Forschungsdemos zu Produktions-Workloads. Das Muster ist einfach: Statt eines monolithischen LLM-Aufrufs zerlegen Sie eine komplexe Aufgabe in spezialisierte Agenten — einen Planner, einen Researcher, einen Executor, einen Critic — jeder verantwortlich für eine spezifische Fähigkeit. Die Orchestrierungsherausforderung ist der Punkt, an dem die meisten Teams stecken bleiben.

Dieser Beitrag präsentiert eine Produktionsarchitektur für Multi-Agent-Orchestrierung auf Azure Container Apps. Wir haben Container Apps gegenüber Functions, Kubernetes und eigenen VMs aus spezifischen Gründen gewählt. Wir behandeln Kommunikationsmuster, State Management, Skalierung, Observability und Infrastructure-as-Code zum Deployment der gesamten Plattform.

Warum Azure Container Apps für Agenten

Die Anforderungen an eine Multi-Agent-Plattform sind:

  1. Scale-to-Zero: Agenten sollten kein Compute verbrennen, wenn sie untätig sind
  2. Unabhängige Skalierung: Jeder Agent-Typ skaliert basierend auf seinem eigenen Workload
  3. Service-to-Service-Kommunikation: Agenten müssen sich zuverlässig gegenseitig aufrufen können
  4. State Management: Agenten brauchen gemeinsamen und privaten State ohne Datenbankverwaltung
  5. Observability: Verteilte Traces über Agent-Interaktionen
  6. Kostenkontrolle: Nur für aktives Compute bezahlen

Azure Container Apps bietet alle sechs. Hier der Vergleich:

AnforderungContainer AppsAKSAzure Functions
Scale-to-ZeroJa (KEDA)Ja (KEDA, aber Cluster-Overhead)Ja
Unabhängige SkalierungJa (per-App KEDA Rules)JaJa
Service-to-ServiceDapr integriertManuell oder Dapr SidecarEingeschränkt
State ManagementDapr State StoresManuell oder Dapr SidecarDurable Entities
GPU-UnterstützungJa (Workload Profiles)JaNein
Container-FlexibilitätVollVollRuntime-Einschränkungen
Ops-OverheadNiedrig (Serverless)Hoch (Cluster-Mgmt)Niedrig

Container Apps gibt Ihnen die Container-Flexibilität von Kubernetes, die Serverless-Ökonomie von Functions und Dapr integriert, ohne Sidecars selbst verwalten zu müssen.

Architekturübersicht

Loading diagram...

Aufgabenfluss

Loading diagram...

Agent-Typen

Orchestrator Agent: Empfängt eingehende Aufgaben, zerlegt sie in Teilaufgaben, weist sie Spezialagenten zu, aggregiert Ergebnisse. Skaliert basierend auf der Tiefe der eingehenden Request-Queue.

Researcher Agent: Führt RAG-Abfragen, Web-Recherchen, Dokumentenanalysen durch. Skaliert basierend auf der Research-Task-Queue-Tiefe. Benötigt möglicherweise mehr Speicher für große Kontextfenster.

Executor Agent: Führt Aktionen aus — API-Aufrufe, Datenbankschreibvorgänge, Code-Ausführung in Sandbox-Umgebungen. Skaliert basierend auf der Execution-Task-Queue-Tiefe. Erfordert strenge Berechtigungsgrenzen.

Critic Agent: Evaluiert Outputs anderer Agenten gegen Qualitätskriterien. Skaliert basierend auf der Evaluierungs-Queue-Tiefe. Leichtgewichtige Compute-Anforderungen.

Agent-Kommunikationsmuster

Muster 1: Ereignisgesteuert via Service Bus (Empfohlen)

Asynchron, entkoppelt, resilient. Jeder Agent veröffentlicht Ergebnisse zu einem Topic und abonniert sein eigenes Aufgaben-Topic.

Python
# agent_base.py — Basisklasse für alle Agenten
from dapr.clients import DaprClient
import json
import uuid
from datetime import datetime

class AgentBase:
    def __init__(self, agent_type: str):
        self.agent_type = agent_type
        self.client = DaprClient()
        self.pubsub_name = "agent-pubsub"

    async def publish_task(self, target_agent: str, task: dict):
        """Veröffentlicht eine Aufgabe zur Verarbeitung durch einen anderen Agenten."""
        message = {
            "task_id": str(uuid.uuid4()),
            "source_agent": self.agent_type,
            "target_agent": target_agent,
            "payload": task,
            "correlation_id": task.get("correlation_id", str(uuid.uuid4())),
            "timestamp": datetime.utcnow().isoformat(),
        }
        self.client.publish_event(
            pubsub_name=self.pubsub_name,
            topic_name=f"agent.{target_agent}.tasks",
            data=json.dumps(message),
            data_content_type="application/json",
        )

    async def publish_result(self, task_id: str, correlation_id: str,
                              result: dict):
        """Veröffentlicht das Ergebnis einer abgeschlossenen Aufgabe."""
        message = {
            "task_id": task_id,
            "source_agent": self.agent_type,
            "correlation_id": correlation_id,
            "result": result,
            "status": "completed",
            "timestamp": datetime.utcnow().isoformat(),
        }
        self.client.publish_event(
            pubsub_name=self.pubsub_name,
            topic_name="agent.results",
            data=json.dumps(message),
            data_content_type="application/json",
        )

Dapr-Pub/Sub-Komponentenkonfiguration für Service Bus:

YAML
# components/pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: agent-pubsub
spec:
  type: pubsub.azure.servicebus.topics
  version: v1
  metadata:
    - name: connectionString
      secretKeyRef:
        name: servicebus-connection
        key: connectionString
    - name: maxDeliveryCount
      value: "5"
    - name: lockDurationInSec
      value: "60"
    - name: maxConcurrentHandlers
      value: "10"

Muster 2: Direktes HTTP via Dapr Service Invocation

Für synchrone, latenzarme Agent-zu-Agent-Aufrufe, wenn eine sofortige Antwort benötigt wird.

Python
async def invoke_agent_directly(self, target_agent: str, method: str,
                                 data: dict, timeout: int = 30):
    """Synchroner Agent-zu-Agent-Aufruf via Dapr Service Invocation."""
    response = self.client.invoke_method(
        app_id=target_agent,
        method_name=method,
        data=json.dumps(data),
        content_type="application/json",
        http_verb="POST",
        timeout=timeout,
    )
    return json.loads(response.data)

Wann welches Muster verwenden

SzenarioMusterBegründung
Aufgabenzerlegung und -zuweisungEreignisgesteuertOrchestrator von Worker-Verfügbarkeit entkoppeln
Critic evaluiert Executor-OutputEreignisgesteuertCritic kann Evaluierungen bündeln
Orchestrator prüft Agent-HealthDirektes HTTPSofortige Antwort nötig
Schnelle Validierung vor AufgabenausführungDirektes HTTPLatenzsensitiv
Fan-Out an mehrere AgentenEreignisgesteuertParallele Verarbeitung, kein Blockieren

State Management mit Dapr

Agenten brauchen State für Aufgabenverfolgung, Konversationskontext und Koordination.

Python
class AgentStateManager:
    def __init__(self, store_name: str = "agent-statestore"):
        self.client = DaprClient()
        self.store_name = store_name

    async def save_task_state(self, task_id: str, state: dict):
        """Speichert Aufgaben-State mit optimistischer Nebenläufigkeit."""
        self.client.save_state(
            store_name=self.store_name,
            key=f"task:{task_id}",
            value=json.dumps(state),
            state_metadata={"contentType": "application/json"},
        )

    async def get_task_state(self, task_id: str) -> dict:
        """Ruft Aufgaben-State ab."""
        response = self.client.get_state(
            store_name=self.store_name,
            key=f"task:{task_id}",
        )
        if response.data:
            return json.loads(response.data)
        return {}

    async def acquire_task_lock(self, task_id: str, owner: str,
                                 ttl_seconds: int = 30) -> bool:
        """Distributed Lock gegen gleichzeitige Aufgabenverarbeitung."""
        try:
            lock_response = self.client.try_lock(
                store_name="agent-lockstore",
                resource_id=f"task-lock:{task_id}",
                lock_owner=owner,
                expiry_in_seconds=ttl_seconds,
            )
            return lock_response.success
        except Exception:
            return False

Der Orchestrator Agent: Detaillierte Implementierung

Der Orchestrator ist der zentrale Koordinator. Er zerlegt Aufgaben, weist Arbeit zu, verfolgt den Fortschritt und aggregiert Ergebnisse.

Python
import os
import asyncio
from fastapi import FastAPI
from dapr.ext.fastapi import DaprApp
from openai import AzureOpenAI

app = FastAPI()
dapr_app = DaprApp(app)

class OrchestratorAgent(AgentBase):
    def __init__(self):
        super().__init__("orchestrator")
        self.state = AgentStateManager()
        self.llm = AzureOpenAI(
            azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
            api_version="2025-04-01-preview",
        )

    async def decompose_task(self, task: dict) -> list:
        """Nutzt LLM zur Zerlegung einer komplexen Aufgabe in Teilaufgaben."""
        response = self.llm.chat.completions.create(
            model="gpt-4o",
            messages=[
                {"role": "system", "content": DECOMPOSITION_PROMPT},
                {"role": "user", "content": json.dumps(task)},
            ],
            response_format={"type": "json_object"},
            temperature=0.1,
        )
        plan = json.loads(response.choices[0].message.content)
        return plan["subtasks"]

    async def handle_new_task(self, task: dict):
        """Haupteinstiegspunkt für neue Aufgaben."""
        correlation_id = task.get("correlation_id", str(uuid.uuid4()))
        subtasks = await self.decompose_task(task)

        task_state = {
            "correlation_id": correlation_id,
            "status": "in_progress",
            "total_subtasks": len(subtasks),
            "completed_subtasks": 0,
            "results": {},
        }
        await self.state.save_task_state(correlation_id, task_state)

        for subtask in subtasks:
            target_agent = subtask["assigned_agent"]
            await self.publish_task(target_agent, {
                "correlation_id": correlation_id,
                "subtask_id": subtask["id"],
                "instruction": subtask["instruction"],
            })

orchestrator = OrchestratorAgent()

@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.orchestrator.tasks")
async def handle_task(event: dict):
    await orchestrator.handle_new_task(event.data)

@dapr_app.subscribe(pubsub="agent-pubsub", topic="agent.results")
async def handle_agent_result(event: dict):
    await orchestrator.handle_result(event.data)

Skalierung mit KEDA

Jeder Agent skaliert unabhängig basierend auf seinem Workload. KEDA (in Container Apps integriert) bietet benutzerdefinierte Skalierungsregeln.

Skalierungsstrategie pro Agent-Typ:

AgentMin ReplicasMax ReplicasSkalierungs-TriggerSchwellenwert
Orchestrator15Queue-Tiefe10 Nachrichten
Researcher020Queue-Tiefe5 Nachrichten
Executor010Queue-Tiefe3 Nachrichten
Critic010Queue-Tiefe5 Nachrichten

Halten Sie den Orchestrator auf mindestens 1 Replica für schnelle Reaktion auf neue Aufgaben. Worker-Agenten skalieren auf Null, wenn sie untätig sind.

Observability mit OpenTelemetry

Verteiltes Tracing über Agenten hinweg ist essenziell für Debugging und Performance-Optimierung.

Python
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor

def setup_telemetry(service_name: str):
    provider = TracerProvider(resource=Resource.create({
        "service.name": service_name,
        "service.namespace": "agent-platform",
    }))
    exporter = OTLPSpanExporter(
        endpoint=os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT",
                                "http://otel-collector:4317")
    )
    provider.add_span_processor(BatchSpanProcessor(exporter))
    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

tracer = setup_telemetry("orchestrator-agent")

Infrastructure as Code: Bicep-Deployment

Die komplette Plattform mit einem einzelnen Bicep-Template deployed.

Bicep
// main.bicep — Multi-Agent-Orchestrierungsplattform
param location string = 'westeurope'
param environmentName string = 'agent-platform'

resource containerEnv 'Microsoft.App/managedEnvironments@2024-03-01' = {
  name: '${environmentName}-env'
  location: location
  properties: {
    daprAIConnectionString: appInsights.properties.ConnectionString
    appLogsConfiguration: {
      destination: 'log-analytics'
      logAnalyticsConfiguration: {
        customerId: logAnalytics.properties.customerId
        sharedKey: logAnalytics.listKeys().primarySharedKey
      }
    }
    workloadProfiles: [
      { name: 'Consumption', workloadProfileType: 'Consumption' }
      {
        name: 'gpu-agents'
        workloadProfileType: 'NC24-A100'
        minimumCount: 0
        maximumCount: 3
      }
    ]
  }
}

resource orchestratorApp 'Microsoft.App/containerApps@2024-03-01' = {
  name: 'orchestrator'
  location: location
  properties: {
    managedEnvironmentId: containerEnv.id
    configuration: {
      dapr: { enabled: true, appId: 'orchestrator', appPort: 8000 }
      ingress: { external: true, targetPort: 8000, transport: 'http' }
    }
    template: {
      containers: [
        {
          name: 'orchestrator'
          image: '${containerRegistry.properties.loginServer}/agents/orchestrator:latest'
          resources: { cpu: json('1.0'), memory: '2Gi' }
        }
      ]
      scale: {
        minReplicas: 1
        maxReplicas: 5
      }
    }
  }
}

Produktionsüberlegungen

Idempotenz: Jede Agent-Operation muss idempotent sein. Service Bus kann Nachrichten mehrfach zustellen. Verwenden Sie die Task-ID als Idempotency Key im State Store.

Timeout-Behandlung: Setzen Sie realistische Timeouts für LLM-Aufrufe (30-60 Sekunden für GPT-4o). Implementieren Sie Circuit Breaker für kaskadierende Ausfälle bei Azure-OpenAI-Latenzspitzen.

Kostenkontrolle: Setzen Sie Max Replicas konservativ. Verwenden Sie Consumption Workload Profiles für CPU-Only-Agenten. Reservieren Sie GPU-Profile nur für Agenten, die Embedding-Generierung oder lokale Modell-Inferenz benötigen.

Sicherheit: Jeder Agent sollte seine eigene Managed Identity mit Least-Privilege-Zugriff haben. Der Executor Agent braucht die restriktivsten Berechtigungen — geben Sie ihm niemals breite Azure-RBAC-Rollen.


CC Conceptualise entwirft und betreibt Multi-Agent-Orchestrierungsplattformen auf Azure Container Apps — von der Architektur bis zum Produktionsbetrieb. Wenn Sie KI-Agent-Systeme mit Enterprise-Zuverlässigkeit aufbauen, kontaktieren Sie uns unter mbrahim@conceptualise.de.

Themen

Multi-Agent-OrchestrierungAzure Container Apps AgentenDapr State ManagementKEDA Autoscaling KIAgent-Kommunikationsmuster

Häufig gestellte Fragen

Azure Functions eignet sich hervorragend für einzelne, ereignisgesteuerte Workloads. Multi-Agent-Systeme brauchen langlebige Prozesse, Inter-Agent-Kommunikation, gemeinsamen State und feingranulare Ressourcenkontrolle. Container Apps bietet Scale-to-Zero wie Functions, fügt aber Dapr für Service-to-Service-Kommunikation und State Management, KEDA für benutzerdefinierte Skalierungsregeln und volle Container-Flexibilität für GPU-Workloads hinzu.

Expert engagement

Brauchen Sie Expertenberatung?

Unser Team ist spezialisiert auf Cloud-Architektur, Security, KI-Plattformen und DevSecOps. Lassen Sie uns besprechen, wie wir Ihrem Unternehmen helfen können.

Kontakt aufnehmenNo commitment · No sales pressure

Verwandte Artikel

Alle Beiträge