|
8 | 8 | from __future__ import annotations |
9 | 9 |
|
10 | 10 | import argparse |
| 11 | +import gzip |
11 | 12 | import json |
12 | 13 | import os |
13 | 14 | import shutil |
14 | 15 | import sqlite3 |
| 16 | +import subprocess |
15 | 17 | import sys |
16 | 18 | import threading |
17 | 19 | import uuid |
@@ -203,27 +205,84 @@ def _execute_run(run_id: str, payload: RunRequest, cancel_event: threading.Event |
203 | 205 | ) |
204 | 206 | RUN_STORE.upsert(RUNS[run_id]) |
205 | 207 |
|
| 208 | + json_output_path = UPLOAD_DIR / f"{run_id}_metrics.json.gz" |
| 209 | + |
206 | 210 | try: |
207 | | - runner = ScriptRunner( |
208 | | - payload.script_path, |
209 | | - script_args=payload.args, |
210 | | - timeout=payload.timeout, |
211 | | - log_level=payload.log_level, |
212 | | - history_db=payload.history_db, |
213 | | - enable_history=payload.enable_history, |
214 | | - ) |
215 | | - RUN_HANDLES[run_id]["runner"] = runner |
| 211 | + # Build command to run runner.py |
| 212 | + cmd = [sys.executable, str(PROJECT_ROOT / "runner.py")] |
| 213 | + |
| 214 | + if payload.timeout: |
| 215 | + cmd.extend(["--timeout", str(payload.timeout)]) |
| 216 | + |
| 217 | + cmd.extend(["--log-level", payload.log_level]) |
| 218 | + |
| 219 | + if payload.history_db: |
| 220 | + cmd.extend(["--history-db", payload.history_db]) |
| 221 | + |
| 222 | + if not payload.enable_history: |
| 223 | + cmd.append("--disable-history") |
| 224 | + |
| 225 | + if payload.retry_on_failure: |
| 226 | + cmd.extend(["--retry", "3"]) |
| 227 | + |
| 228 | + # Use JSON output to capture metrics |
| 229 | + cmd.extend(["--json-output", str(json_output_path)]) |
| 230 | + |
| 231 | + # Script and args |
| 232 | + cmd.append(payload.script_path) |
| 233 | + cmd.extend(payload.args) |
216 | 234 |
|
217 | 235 | if cancel_event.is_set(): |
218 | 236 | raise RuntimeError("Run cancelled before start") |
219 | 237 |
|
220 | | - result = runner.run_script(retry_on_failure=payload.retry_on_failure) |
| 238 | + process = subprocess.Popen( |
| 239 | + cmd, |
| 240 | + stdout=subprocess.PIPE, |
| 241 | + stderr=subprocess.PIPE, |
| 242 | + text=True, |
| 243 | + cwd=PROJECT_ROOT |
| 244 | + ) |
| 245 | + |
| 246 | + RUN_HANDLES[run_id]["process"] = process |
| 247 | + |
| 248 | + # Wait for completion or cancellation |
| 249 | + while process.poll() is None: |
| 250 | + if cancel_event.is_set(): |
| 251 | + process.terminate() |
| 252 | + try: |
| 253 | + process.wait(timeout=5) |
| 254 | + except subprocess.TimeoutExpired: |
| 255 | + process.kill() |
| 256 | + raise RuntimeError("Run cancelled by user") |
| 257 | + threading.Event().wait(0.1) # Sleep briefly |
| 258 | + |
| 259 | + stdout, stderr = process.communicate() |
| 260 | + returncode = process.returncode |
| 261 | + |
| 262 | + # Read metrics from JSON output |
| 263 | + metrics = {} |
| 264 | + if json_output_path.exists(): |
| 265 | + try: |
| 266 | + with gzip.open(json_output_path, 'rt') as f: |
| 267 | + metrics = json.load(f) |
| 268 | + json_output_path.unlink() |
| 269 | + except Exception as e: |
| 270 | + print(f"Failed to read metrics: {e}") |
| 271 | + |
| 272 | + result = { |
| 273 | + "returncode": returncode, |
| 274 | + "stdout": stdout, |
| 275 | + "stderr": stderr, |
| 276 | + "metrics": metrics |
| 277 | + } |
| 278 | + |
221 | 279 | if cancel_event.is_set(): |
222 | 280 | status = "cancelled" |
223 | 281 | error = "Run cancelled by user" |
224 | 282 | else: |
225 | | - status = "completed" if result.get("returncode", 1) == 0 else "failed" |
| 283 | + status = "completed" if returncode == 0 else "failed" |
226 | 284 | error = None |
| 285 | + |
227 | 286 | finished_at = datetime.utcnow() |
228 | 287 | record = RunRecord( |
229 | 288 | id=run_id, |
@@ -253,6 +312,11 @@ def _execute_run(run_id: str, payload: RunRequest, cancel_event: threading.Event |
253 | 312 | RUN_STORE.upsert(record) |
254 | 313 | finally: |
255 | 314 | RUN_HANDLES.pop(run_id, None) |
| 315 | + if json_output_path.exists(): |
| 316 | + try: |
| 317 | + json_output_path.unlink() |
| 318 | + except: |
| 319 | + pass |
256 | 320 |
|
257 | 321 |
|
258 | 322 | def _validate_script_path(path_str: str) -> Path: |
@@ -291,7 +355,7 @@ def _queue_run(payload: RunRequest, background_tasks: BackgroundTasks) -> Dict[s |
291 | 355 | cancel_event = threading.Event() |
292 | 356 | with RUNS_LOCK: |
293 | 357 | RUNS[run_id] = record |
294 | | - RUN_HANDLES[run_id] = {"cancel_event": cancel_event, "runner": None} |
| 358 | + RUN_HANDLES[run_id] = {"cancel_event": cancel_event, "process": None} |
295 | 359 | RUN_STORE.upsert(record) |
296 | 360 |
|
297 | 361 | background_tasks.add_task(_execute_run, run_id, payload, cancel_event) |
@@ -380,9 +444,9 @@ def cancel_run(run_id: str) -> Dict[str, str]: |
380 | 444 | raise HTTPException(status_code=409, detail="Run already finished") |
381 | 445 | if handle: |
382 | 446 | handle["cancel_event"].set() |
383 | | - runner = handle.get("runner") |
384 | | - if runner: |
385 | | - runner.cancel_active_run() |
| 447 | + process = handle.get("process") |
| 448 | + if process: |
| 449 | + process.terminate() |
386 | 450 | finished_at = datetime.utcnow() |
387 | 451 | updated = RunRecord( |
388 | 452 | id=record.id, |
|
0 commit comments