Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 63 additions & 66 deletions src/backend/af/orchestration/orchestration_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,20 @@
import asyncio
import logging
import uuid
from typing import List, Optional, Callable, Awaitable
from typing import List, Optional

# agent_framework imports
from agent_framework_azure_ai import AzureAIAgentClient
from agent_framework import ChatMessage, ChatOptions, WorkflowOutputEvent, AgentRunResponseUpdate, MagenticBuilder

from agent_framework import (
ChatMessage,
WorkflowOutputEvent,
MagenticBuilder,
MagenticCallbackMode,
MagenticOrchestratorMessageEvent,
MagenticAgentDeltaEvent,
MagenticAgentMessageEvent,
MagenticFinalResultEvent,
)

from common.config.app_config import config
from common.models.messages_af import TeamConfiguration
Expand All @@ -23,6 +31,7 @@
from af.orchestration.human_approval_manager import HumanApprovalMagenticManager
from af.magentic_agents.magentic_agent_factory import MagenticAgentFactory


class OrchestrationManager:
"""Manager for handling orchestration logic using agent_framework Magentic workflow."""

Expand All @@ -32,41 +41,6 @@ def __init__(self):
self.user_id: Optional[str] = None
self.logger = self.__class__.logger

# ---------------------------
# Internal callback adapters
# ---------------------------
@staticmethod
def _user_aware_agent_callback(
user_id: str,
) -> Callable[[str, ChatMessage], Awaitable[None]]:
"""Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature."""

async def _cb(agent_id: str, message: ChatMessage):
try:
agent_response_callback(agent_id, message, user_id) # Fixed: added agent_id
except Exception as e: # noqa: BLE001
logging.getLogger(__name__).error(
"agent_response_callback error: %s", e
)

return _cb

@staticmethod
def _user_aware_streaming_callback(
user_id: str,
) -> Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]]:
"""Adapts streaming updates to existing streaming handler."""

async def _cb(agent_id: str, update: AgentRunResponseUpdate, is_final: bool):
try:
await streaming_agent_response_callback(agent_id, update, is_final, user_id) # Fixed: removed shim
except Exception as e: # noqa: BLE001
logging.getLogger(__name__).error(
"streaming_agent_response_callback error: %s", e
)

return _cb

# ---------------------------
# Orchestration construction
# ---------------------------
Expand All @@ -77,6 +51,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
- Provided agents (participants)
- HumanApprovalMagenticManager as orchestrator manager
- AzureAIAgentClient as the underlying chat client
- Event-based callbacks for streaming and final responses

This mirrors the old Semantic Kernel orchestration setup:
- Uses same deployment, endpoint, and credentials
Expand Down Expand Up @@ -135,41 +110,58 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None):
participants[name] = ag
cls.logger.debug("Added participant '%s'", name)

# Assemble workflow
# Create unified event callback for Magentic workflow
async def on_event(event) -> None:
"""
Handle all Magentic workflow events and route them to appropriate handlers.
This replaces the old callback attachment approach with the proper event-based system.
"""
try:
if isinstance(event, MagenticOrchestratorMessageEvent):
# Orchestrator messages (task assignments, coordination)
message_text = getattr(event.message, 'text', '')
cls.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}")

elif isinstance(event, MagenticAgentDeltaEvent):
# Streaming update from agent - convert to our format
# MagenticAgentDeltaEvent has: agent_id, text, and other properties
try:
await streaming_agent_response_callback(
event.agent_id,
event, # Pass the event itself as the update object
False, # Not final yet (streaming in progress)
user_id
)
except Exception as e:
cls.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}")

elif isinstance(event, MagenticAgentMessageEvent):
# Final agent message - complete response
if event.message:
try:
agent_response_callback(event.agent_id, event.message, user_id)
except Exception as e:
cls.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}")

elif isinstance(event, MagenticFinalResultEvent):
# Final result from the entire workflow
final_text = getattr(event.message, 'text', '')
cls.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars")

except Exception as e:
cls.logger.error(f"Error in on_event callback: {e}", exc_info=True)

# Assemble workflow with .on_event() callback (proper way for agent_framework)
builder = (
MagenticBuilder()
.participants(**participants)
.on_event(on_event, mode=MagenticCallbackMode.STREAMING) # Enable streaming events
.with_standard_manager(manager=manager)
)

# Build workflow
workflow = builder.build()
cls.logger.info("Built Magentic workflow with %d participants", len(participants))

# Wire agent response callbacks onto orchestrator
try:
orchestrator = getattr(workflow, "_orchestrator", None)
if orchestrator:
if getattr(orchestrator, "_agent_response_callback", None) is None:
setattr(
orchestrator,
"_agent_response_callback",
cls._user_aware_agent_callback(user_id),
)
if (
getattr(orchestrator, "_streaming_agent_response_callback", None)
is None
):
setattr(
orchestrator,
"_streaming_agent_response_callback",
cls._user_aware_streaming_callback(user_id),
)
cls.logger.debug("Attached callbacks to workflow orchestrator")
except Exception as e:
cls.logger.warning(
"Could not attach callbacks to workflow orchestrator: %s", e
)
cls.logger.info("Built Magentic workflow with %d participants and event callbacks", len(participants))

return workflow

Expand Down Expand Up @@ -254,7 +246,12 @@ async def run_orchestration(self, user_id: str, input_task) -> None:
async for event in workflow.run_stream(task_text):
# Check if this is the final output event
if isinstance(event, WorkflowOutputEvent):
final_output = str(event.data)
# Extract text from ChatMessage object
output_data = event.data
if isinstance(output_data, ChatMessage):
final_output = getattr(output_data, "text", None) or str(output_data)
else:
final_output = str(output_data)
self.logger.debug("Received workflow output event")

# Extract final result
Expand Down
Loading