diff --git a/src/backend/af/orchestration/orchestration_manager.py b/src/backend/af/orchestration/orchestration_manager.py index 78b87562..17bca2f7 100644 --- a/src/backend/af/orchestration/orchestration_manager.py +++ b/src/backend/af/orchestration/orchestration_manager.py @@ -3,12 +3,20 @@ import asyncio import logging import uuid -from typing import List, Optional, Callable, Awaitable +from typing import List, Optional # agent_framework imports from agent_framework_azure_ai import AzureAIAgentClient -from agent_framework import ChatMessage, ChatOptions, WorkflowOutputEvent, AgentRunResponseUpdate, MagenticBuilder - +from agent_framework import ( + ChatMessage, + WorkflowOutputEvent, + MagenticBuilder, + MagenticCallbackMode, + MagenticOrchestratorMessageEvent, + MagenticAgentDeltaEvent, + MagenticAgentMessageEvent, + MagenticFinalResultEvent, +) from common.config.app_config import config from common.models.messages_af import TeamConfiguration @@ -23,6 +31,7 @@ from af.orchestration.human_approval_manager import HumanApprovalMagenticManager from af.magentic_agents.magentic_agent_factory import MagenticAgentFactory + class OrchestrationManager: """Manager for handling orchestration logic using agent_framework Magentic workflow.""" @@ -32,41 +41,6 @@ def __init__(self): self.user_id: Optional[str] = None self.logger = self.__class__.logger - # --------------------------- - # Internal callback adapters - # --------------------------- - @staticmethod - def _user_aware_agent_callback( - user_id: str, - ) -> Callable[[str, ChatMessage], Awaitable[None]]: - """Adapts agent_framework final agent ChatMessage to legacy agent_response_callback signature.""" - - async def _cb(agent_id: str, message: ChatMessage): - try: - agent_response_callback(agent_id, message, user_id) # Fixed: added agent_id - except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error( - "agent_response_callback error: %s", e - ) - - return _cb - - @staticmethod - def _user_aware_streaming_callback( - user_id: str, - ) -> Callable[[str, AgentRunResponseUpdate, bool], Awaitable[None]]: - """Adapts streaming updates to existing streaming handler.""" - - async def _cb(agent_id: str, update: AgentRunResponseUpdate, is_final: bool): - try: - await streaming_agent_response_callback(agent_id, update, is_final, user_id) # Fixed: removed shim - except Exception as e: # noqa: BLE001 - logging.getLogger(__name__).error( - "streaming_agent_response_callback error: %s", e - ) - - return _cb - # --------------------------- # Orchestration construction # --------------------------- @@ -77,6 +51,7 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): - Provided agents (participants) - HumanApprovalMagenticManager as orchestrator manager - AzureAIAgentClient as the underlying chat client + - Event-based callbacks for streaming and final responses This mirrors the old Semantic Kernel orchestration setup: - Uses same deployment, endpoint, and credentials @@ -135,41 +110,58 @@ async def init_orchestration(cls, agents: List, user_id: str | None = None): participants[name] = ag cls.logger.debug("Added participant '%s'", name) - # Assemble workflow + # Create unified event callback for Magentic workflow + async def on_event(event) -> None: + """ + Handle all Magentic workflow events and route them to appropriate handlers. + This replaces the old callback attachment approach with the proper event-based system. + """ + try: + if isinstance(event, MagenticOrchestratorMessageEvent): + # Orchestrator messages (task assignments, coordination) + message_text = getattr(event.message, 'text', '') + cls.logger.info(f"[ORCHESTRATOR:{event.kind}] {message_text}") + + elif isinstance(event, MagenticAgentDeltaEvent): + # Streaming update from agent - convert to our format + # MagenticAgentDeltaEvent has: agent_id, text, and other properties + try: + await streaming_agent_response_callback( + event.agent_id, + event, # Pass the event itself as the update object + False, # Not final yet (streaming in progress) + user_id + ) + except Exception as e: + cls.logger.error(f"Error in streaming callback for agent {event.agent_id}: {e}") + + elif isinstance(event, MagenticAgentMessageEvent): + # Final agent message - complete response + if event.message: + try: + agent_response_callback(event.agent_id, event.message, user_id) + except Exception as e: + cls.logger.error(f"Error in agent callback for agent {event.agent_id}: {e}") + + elif isinstance(event, MagenticFinalResultEvent): + # Final result from the entire workflow + final_text = getattr(event.message, 'text', '') + cls.logger.info(f"[FINAL RESULT] Length: {len(final_text)} chars") + + except Exception as e: + cls.logger.error(f"Error in on_event callback: {e}", exc_info=True) + + # Assemble workflow with .on_event() callback (proper way for agent_framework) builder = ( MagenticBuilder() .participants(**participants) + .on_event(on_event, mode=MagenticCallbackMode.STREAMING) # Enable streaming events .with_standard_manager(manager=manager) ) # Build workflow workflow = builder.build() - cls.logger.info("Built Magentic workflow with %d participants", len(participants)) - - # Wire agent response callbacks onto orchestrator - try: - orchestrator = getattr(workflow, "_orchestrator", None) - if orchestrator: - if getattr(orchestrator, "_agent_response_callback", None) is None: - setattr( - orchestrator, - "_agent_response_callback", - cls._user_aware_agent_callback(user_id), - ) - if ( - getattr(orchestrator, "_streaming_agent_response_callback", None) - is None - ): - setattr( - orchestrator, - "_streaming_agent_response_callback", - cls._user_aware_streaming_callback(user_id), - ) - cls.logger.debug("Attached callbacks to workflow orchestrator") - except Exception as e: - cls.logger.warning( - "Could not attach callbacks to workflow orchestrator: %s", e - ) + cls.logger.info("Built Magentic workflow with %d participants and event callbacks", len(participants)) return workflow @@ -254,7 +246,12 @@ async def run_orchestration(self, user_id: str, input_task) -> None: async for event in workflow.run_stream(task_text): # Check if this is the final output event if isinstance(event, WorkflowOutputEvent): - final_output = str(event.data) + # Extract text from ChatMessage object + output_data = event.data + if isinstance(output_data, ChatMessage): + final_output = getattr(output_data, "text", None) or str(output_data) + else: + final_output = str(output_data) self.logger.debug("Received workflow output event") # Extract final result