|
2 | 2 | import logging
|
3 | 3 | import os
|
4 | 4 | import pickle
|
| 5 | +from cgitb import handler |
5 | 6 | from contextlib import suppress
|
6 | 7 | from typing import Optional, cast
|
7 | 8 |
|
8 | 9 | from llama_index.core.workflow import (
|
9 | 10 | Context,
|
10 | 11 | HumanResponseEvent,
|
11 | 12 | InputRequiredEvent,
|
12 |
| - JsonPickleSerializer, |
| 13 | + JsonPickleSerializer, StartEvent, |
13 | 14 | )
|
14 | 15 | from llama_index.core.workflow.errors import WorkflowTimeoutError
|
15 | 16 | from llama_index.core.workflow.handler import WorkflowHandler
|
@@ -87,30 +88,12 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
|
87 | 88 | if self.context.workflow_context is None:
|
88 | 89 | return None
|
89 | 90 |
|
90 |
| - handler: WorkflowHandler = self.context.workflow.run( |
91 |
| - start_event=ev if self.context.resume else None, |
92 |
| - ctx=self.context.workflow_context, |
93 |
| - **(self.context.input_json or {}), |
94 |
| - ) |
95 |
| - |
96 | 91 | resume_trigger: Optional[UiPathResumeTrigger] = None
|
97 | 92 |
|
98 |
| - response_applied = False |
99 |
| - async for event in handler.stream_events(): |
100 |
| - # log the received event on trace level |
101 |
| - if isinstance(event, InputRequiredEvent): |
102 |
| - # for api trigger hitl scenarios only pass the str input for processing |
103 |
| - hitl_processor = HitlProcessor(value=event.prefix) |
104 |
| - if self.context.resume and not response_applied: |
105 |
| - # If we are resuming, we need to apply the response to the event stream. |
106 |
| - response_applied = True |
107 |
| - response_event = await self.get_response_event() |
108 |
| - if response_event: |
109 |
| - # If we have a response event, send it to the workflow context. |
110 |
| - self.context.workflow_context.send_event(response_event) |
111 |
| - else: |
112 |
| - resume_trigger = await hitl_processor.create_resume_trigger() |
113 |
| - break |
| 93 | + if self.context.debug and os.getenv("VSCODE_EXPLORER_EXECUTION") == "True": |
| 94 | + handler = await self.debug_workflow() |
| 95 | + else: |
| 96 | + handler, resume_trigger = await self.run_workflow(ev) |
114 | 97 |
|
115 | 98 | if resume_trigger is None:
|
116 | 99 | try:
|
@@ -179,6 +162,63 @@ async def execute(self) -> Optional[UiPathRuntimeResult]:
|
179 | 162 | finally:
|
180 | 163 | self.trace_provider.shutdown()
|
181 | 164 |
|
| 165 | + async def debug_workflow(self): |
| 166 | + try: |
| 167 | + import debugpy |
| 168 | + except ImportError: |
| 169 | + raise Exception("DebugPy not available") |
| 170 | + |
| 171 | + handler: WorkflowHandler = self.context.workflow.run( |
| 172 | + stepwise=True, |
| 173 | + **(self.context.input_json or {}), |
| 174 | + ) |
| 175 | + # if we are in debug mode, emit all events |
| 176 | + while produced_events := await handler.run_step(): |
| 177 | + for ev in produced_events: |
| 178 | + debug_file_path = os.environ.get('VSCODE_EXTENSION_TEMP_DEBUG_FILE') |
| 179 | + if debug_file_path: |
| 180 | + # Write the debug data |
| 181 | + debug_data = { |
| 182 | + 'llama_event': self._serialize_object(ev) |
| 183 | + } |
| 184 | + with open(debug_file_path, 'w') as f: |
| 185 | + json.dump(debug_data, f) |
| 186 | + |
| 187 | + # Trigger breakpoint |
| 188 | + debugpy.breakpoint() |
| 189 | + |
| 190 | + # Send the event |
| 191 | + handler.ctx.send_event(ev) |
| 192 | + return handler |
| 193 | + |
| 194 | + async def run_workflow(self, start_event :type[StartEvent]) -> (WorkflowHandler, Optional[UiPathResumeTrigger]): |
| 195 | + resume_trigger: Optional[UiPathResumeTrigger] = None |
| 196 | + |
| 197 | + handler: WorkflowHandler = self.context.workflow.run( |
| 198 | + start_event=start_event if self.context.resume else None, |
| 199 | + ctx=self.context.workflow_context, |
| 200 | + **(self.context.input_json or {}), |
| 201 | + ) |
| 202 | + |
| 203 | + response_applied = False |
| 204 | + async for event in handler.stream_events(): |
| 205 | + # log the received event on trace level |
| 206 | + if isinstance(event, InputRequiredEvent): |
| 207 | + # for api trigger hitl scenarios only pass the str input for processing |
| 208 | + hitl_processor = HitlProcessor(value=event.prefix) |
| 209 | + if self.context.resume and not response_applied: |
| 210 | + # If we are resuming, we need to apply the response to the event stream. |
| 211 | + response_applied = True |
| 212 | + response_event = await self.get_response_event() |
| 213 | + if response_event: |
| 214 | + # If we have a response event, send it to the workflow context. |
| 215 | + self.context.workflow_context.send_event(response_event) |
| 216 | + else: |
| 217 | + resume_trigger = await hitl_processor.create_resume_trigger() |
| 218 | + break |
| 219 | + |
| 220 | + return handler, resume_trigger |
| 221 | + |
182 | 222 | async def validate(self) -> None:
|
183 | 223 | """Validate runtime inputs and load Llama agent configuration."""
|
184 | 224 | try:
|
|
0 commit comments