Skip to content

Commit 04a4e60

Browse files
committed
Set final status to failed on exception
1 parent c2b1ca2 commit 04a4e60

File tree

2 files changed

+43
-4
lines changed

2 files changed

+43
-4
lines changed

graphdatascience/query_runner/progress/query_progress_logger.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def _log(
8181
continue
8282

8383
if pbar is not None:
84-
self._finish_pbar(pbar)
84+
self._finish_pbar(future, pbar)
8585
# TODO show as cancelled if future was interrupted
8686

8787
def _init_pbar(self, task_with_progress: TaskWithProgress) -> tqdm: # type: ignore
@@ -119,7 +119,11 @@ def _update_pbar(self, pbar: tqdm, task_with_progress: TaskWithProgress) -> None
119119
else:
120120
pbar.refresh()
121121

122-
def _finish_pbar(self, pbar: tqdm) -> None: # type: ignore
122+
def _finish_pbar(self, future: Future[Any], pbar: tqdm) -> None: # type: ignore
123+
if future.exception():
124+
pbar.set_postfix_str("status: FAILED", refresh=True)
125+
return
126+
123127
if pbar.total is not None:
124128
pbar.update(pbar.total - pbar.n)
125129
pbar.set_postfix_str("status: FINISHED", refresh=True)

graphdatascience/tests/unit/query_runner/progress/test_query_progress_logger.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
import time
3+
from concurrent.futures import ThreadPoolExecutor
34
from io import StringIO
45
from typing import Optional
56

@@ -127,7 +128,10 @@ def simple_run_cypher(query: str, database: Optional[str] = None) -> DataFrame:
127128
running_output,
128129
), running_output
129130

130-
qpl._finish_pbar(pbar)
131+
with ThreadPoolExecutor() as executor:
132+
future = executor.submit(lambda: None)
133+
qpl._finish_pbar(future, pbar)
134+
131135
finished_output = pbarOutputStream.getvalue().split("\r")[-1]
132136
assert re.match(
133137
r"test task: 100%\|##########\| 100.0/100 \[00:00<00:00, \d+.\d+%/s, status: FINISHED\]", finished_output
@@ -149,7 +153,10 @@ def simple_run_cypher(query: str, database: Optional[str] = None) -> DataFrame:
149153

150154
qpl._update_pbar(pbar, TaskWithProgress("test task", "n/a", "PENDING", ""))
151155
qpl._update_pbar(pbar, TaskWithProgress("test task", "", "RUNNING", "root 1/1::leaf"))
152-
qpl._finish_pbar(pbar)
156+
157+
with ThreadPoolExecutor() as executor:
158+
future = executor.submit(lambda: None)
159+
qpl._finish_pbar(future, pbar)
153160

154161
assert pbarOutputStream.getvalue().rstrip() == "".join(
155162
[
@@ -160,6 +167,34 @@ def simple_run_cypher(query: str, database: Optional[str] = None) -> DataFrame:
160167
)
161168

162169

170+
def test_progress_bar_with_failing_query() -> None:
171+
def simple_run_cypher(query: str, database: Optional[str] = None) -> DataFrame:
172+
raise NotImplementedError("Should not be called!")
173+
174+
def failing_runnable() -> DataFrame:
175+
raise NotImplementedError("Should not be called!")
176+
177+
with StringIO() as pbarOutputStream:
178+
qpl = QueryProgressLogger(
179+
simple_run_cypher,
180+
lambda: ServerVersion(3, 0, 0),
181+
progress_bar_options={"file": pbarOutputStream, "mininterval": 100},
182+
)
183+
184+
with ThreadPoolExecutor() as executor:
185+
future = executor.submit(failing_runnable)
186+
187+
pbar = qpl._init_pbar(TaskWithProgress("test task", "n/a", "PENDING", ""))
188+
qpl._finish_pbar(future, pbar)
189+
190+
assert pbarOutputStream.getvalue().rstrip() == "".join(
191+
[
192+
"\rtest task [elapsed: 00:00 ]",
193+
"\rtest task [elapsed: 00:00 , status: FAILED]",
194+
]
195+
)
196+
197+
163198
def test_uses_static_store() -> None:
164199
def fake_run_cypher(query: str, database: Optional[str] = None) -> DataFrame:
165200
return DataFrame([{"progress": "n/a", "taskName": "Test task", "status": "RUNNING"}])

0 commit comments

Comments
 (0)