Skip to content

Commit 54ce432

Browse files
committed
Expose stats
1 parent 901442b commit 54ce432

File tree

4 files changed

+280
-51
lines changed

4 files changed

+280
-51
lines changed

tasktiger/stats.py

Lines changed: 66 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,72 @@
11
import threading
22
import time
3-
from typing import TYPE_CHECKING, Optional
3+
from typing import TYPE_CHECKING, Callable, Optional
44

5-
from ._internal import g_fork_lock
5+
from ._internal import g_fork_lock, import_attribute
66

77
if TYPE_CHECKING:
8-
from .worker import Worker
8+
from .tasktiger import TaskTiger
99

1010

11-
class StatsThread(threading.Thread):
12-
def __init__(self, tiger: "Worker") -> None:
13-
super(StatsThread, self).__init__()
11+
class Stats:
12+
def __init__(
13+
self,
14+
tiger: "TaskTiger",
15+
callback: Optional[Callable[[float], None]] = None,
16+
) -> None:
17+
super().__init__()
18+
1419
self.tiger = tiger
15-
self._stop_event = threading.Event()
1620

17-
self._task_running = False
21+
self._logging_thread: Optional[StatsLoggingThread] = None
22+
1823
self._time_start = time.monotonic()
19-
self._time_busy: float = 0.0
24+
self._time_busy = 0.0
2025
self._task_start_time: Optional[float] = None
21-
self.daemon = True # Exit process if main thread exits unexpectedly
2226

2327
# Lock that protects stats computations from interleaving. For example,
2428
# we don't want report_task_start() to run at the same time as
25-
# compute_stats(), as it might result in an inconsistent state.
26-
self._computation_lock = threading.Lock()
29+
# log(), as it might result in an inconsistent state.
30+
self._lock = threading.Lock()
31+
32+
# Callback to receive the duration of each completed task.
33+
self._callback = (
34+
import_attribute(callback)
35+
if (callback := self.tiger.config["STATS_CALLBACK"])
36+
else None
37+
)
2738

2839
def report_task_start(self) -> None:
2940
now = time.monotonic()
30-
with self._computation_lock:
41+
with self._lock:
3142
self._task_start_time = now
32-
self._task_running = True
3343

3444
def report_task_end(self) -> None:
45+
assert self._task_start_time
3546
now = time.monotonic()
36-
with self._computation_lock:
37-
assert self._task_start_time is not None
38-
self._time_busy += now - self._task_start_time
39-
self._task_running = False
47+
48+
if self._callback:
49+
self._callback(now - self._task_start_time)
50+
51+
with self._lock:
52+
self._record_time_busy(now)
4053
self._task_start_time = None
4154

42-
def compute_stats(self) -> None:
55+
def log(self) -> None:
4356
now = time.monotonic()
4457

45-
with self._computation_lock:
58+
with self._lock:
4659
time_total = now - self._time_start
4760
time_busy = self._time_busy
61+
62+
if self._task_start_time is not None:
63+
# Add busy time for the currently running task
64+
self._record_time_busy(now)
65+
66+
time_busy = self._time_busy
67+
4868
self._time_start = now
4969
self._time_busy = 0
50-
if self._task_running:
51-
assert self._task_start_time is not None
52-
time_busy += now - self._task_start_time
53-
self._task_start_time = now
54-
else:
55-
self._task_start_time = None
5670

5771
if time_total:
5872
utilization = 100.0 / time_total * time_busy
@@ -64,9 +78,34 @@ def compute_stats(self) -> None:
6478
utilization=utilization,
6579
)
6680

81+
def start_logging_thread(self) -> None:
82+
if not self._logging_thread:
83+
self._logging_thread = StatsLoggingThread(self)
84+
self._logging_thread.start()
85+
86+
def stop_logging_thread(self) -> None:
87+
if self._logging_thread:
88+
self._logging_thread.stop()
89+
self._logging_thread = None
90+
91+
def _record_time_busy(self, now: float) -> None:
92+
assert self._task_start_time
93+
self._time_busy += now - max(self._task_start_time, self._time_start)
94+
95+
96+
class StatsLoggingThread(threading.Thread):
97+
def __init__(self, stats: Stats) -> None:
98+
super().__init__()
99+
100+
self.tiger = stats.tiger
101+
self._stats: Stats = stats
102+
self._stop_event = threading.Event()
103+
104+
self.daemon = True # Exit process if main thread exits unexpectedly
105+
67106
def run(self) -> None:
68107
while not self._stop_event.wait(self.tiger.config["STATS_INTERVAL"]):
69-
self.compute_stats()
108+
self._stats.log()
70109

71110
def stop(self) -> None:
72111
self._stop_event.set()

tasktiger/tasktiger.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ def init(
198198
"BATCH_QUEUES": {},
199199
# How often to print stats.
200200
"STATS_INTERVAL": 60,
201+
# The function to call with the duration of each completed task.
202+
# This can be useful to measure the worker's utilisation %.
203+
# For example, the worker's utilisation over the last 30 minutes
204+
# can be obtained by dividing the sum of task durations reported
205+
# over the last 30 minutes by 30 minutes.
206+
"STATS_CALLBACK": None,
201207
# Single worker queues can reduce redis activity in some use cases
202208
# by locking at the queue level instead of just at the task or task
203209
# group level. These queues will only allow a single worker to

tasktiger/worker.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
)
5656
from .redis_semaphore import Semaphore
5757
from .runner import get_runner_class
58-
from .stats import StatsThread
58+
from .stats import Stats
5959
from .task import Task
6060
from .timeouts import JobTimeoutException
6161
from .utils import redis_glob_escape
@@ -107,7 +107,7 @@ def __init__(
107107
self._key = tiger._key
108108
self._did_work = True
109109
self._last_task_check = 0.0
110-
self.stats_thread: Optional[StatsThread] = None
110+
self.stats = Stats(self.tiger)
111111
self.id = str(uuid.uuid4())
112112

113113
if queues:
@@ -1008,13 +1008,11 @@ def _execute_task_group(
10081008
if not ready_tasks:
10091009
return True, []
10101010

1011-
if self.stats_thread:
1012-
self.stats_thread.report_task_start()
1011+
self.stats.report_task_start()
10131012
success = self._execute(
10141013
queue, ready_tasks, log, locks, queue_lock, all_task_ids
10151014
)
1016-
if self.stats_thread:
1017-
self.stats_thread.report_task_end()
1015+
self.stats.report_task_end()
10181016

10191017
for lock in locks:
10201018
try:
@@ -1280,9 +1278,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
12801278
self.log.warn("using old Redis version")
12811279

12821280
if self.config["STATS_INTERVAL"]:
1283-
stats_thread = StatsThread(self)
1284-
self.stats_thread = stats_thread
1285-
stats_thread.start()
1281+
self.stats.start_logging_thread()
12861282

12871283
# Queue any periodic tasks that are not queued yet.
12881284
self._queue_periodic_tasks()
@@ -1328,9 +1324,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
13281324
raise
13291325

13301326
finally:
1331-
if self.stats_thread:
1332-
self.stats_thread.stop()
1333-
self.stats_thread = None
1327+
self.stats.stop_logging_thread()
13341328

13351329
# Free up Redis connection
13361330
if self._pubsub:

0 commit comments

Comments
 (0)