Skip to content

Commit 5fc01f6

Browse files
committed
Expose stats
1 parent 901442b commit 5fc01f6

File tree

4 files changed

+270
-47
lines changed

4 files changed

+270
-47
lines changed

tasktiger/stats.py

Lines changed: 56 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -2,57 +2,66 @@
22
import time
33
from typing import TYPE_CHECKING, Optional
44

5-
from ._internal import g_fork_lock
5+
from ._internal import g_fork_lock, import_attribute
66

77
if TYPE_CHECKING:
88
from .worker import Worker
99

1010

11-
class StatsThread(threading.Thread):
12-
def __init__(self, tiger: "Worker") -> None:
13-
super(StatsThread, self).__init__()
11+
class Stats:
12+
def __init__(self, tiger, callback=None) -> None:
13+
super().__init__()
14+
1415
self.tiger = tiger
15-
self._stop_event = threading.Event()
1616

17-
self._task_running = False
17+
self._logging_thread: Optional[StatsLoggingThread] = None
18+
1819
self._time_start = time.monotonic()
19-
self._time_busy: float = 0.0
20-
self._task_start_time: Optional[float] = None
21-
self.daemon = True # Exit process if main thread exits unexpectedly
20+
self._time_busy = 0
21+
self._task_start_time = None
2222

2323
# Lock that protects stats computations from interleaving. For example,
2424
# we don't want report_task_start() to run at the same time as
25-
# compute_stats(), as it might result in an inconsistent state.
25+
# compute(), as it might result in an inconsistent state.
2626
self._computation_lock = threading.Lock()
2727

28-
def report_task_start(self) -> None:
28+
# Callback to receive the duration of each completed task.
29+
self._callback = (
30+
import_attribute(callback)
31+
if (callback := self.tiger.config["STATS_CALLBACK"])
32+
else None
33+
)
34+
35+
def report_task_start(self):
2936
now = time.monotonic()
3037
with self._computation_lock:
3138
self._task_start_time = now
32-
self._task_running = True
3339

3440
def report_task_end(self) -> None:
3541
now = time.monotonic()
42+
43+
if self._callback:
44+
self._callback(now - self._task_start_time)
45+
3646
with self._computation_lock:
37-
assert self._task_start_time is not None
38-
self._time_busy += now - self._task_start_time
39-
self._task_running = False
47+
self._record_time_busy(now)
4048
self._task_start_time = None
4149

42-
def compute_stats(self) -> None:
50+
def log(self) -> None:
4351
now = time.monotonic()
4452

4553
with self._computation_lock:
4654
time_total = now - self._time_start
4755
time_busy = self._time_busy
56+
57+
if self._task_start_time is not None:
58+
# Add busy time for the currently running task
59+
self._record_time_busy(now)
60+
61+
time_busy = self._time_busy
62+
4863
self._time_start = now
4964
self._time_busy = 0
50-
if self._task_running:
51-
assert self._task_start_time is not None
52-
time_busy += now - self._task_start_time
53-
self._task_start_time = now
54-
else:
55-
self._task_start_time = None
5665

5766
if time_total:
5867
utilization = 100.0 / time_total * time_busy
@@ -64,9 +73,33 @@ def compute_stats(self) -> None:
6473
utilization=utilization,
6574
)
6675

76+
def start_logging_thread(self):
77+
if not self._logging_thread:
78+
self._logging_thread = StatsLoggingThread(self)
79+
self._logging_thread.start()
80+
81+
def stop_logging_thread(self):
82+
if self._logging_thread:
83+
self._logging_thread.stop()
84+
self._logging_thread = None
85+
86+
def _record_time_busy(self, now: float) -> None:
87+
self._time_busy += now - max(self._task_start_time, self._time_start)
88+
89+
90+
class StatsLoggingThread(threading.Thread):
91+
def __init__(self, stats) -> None:
92+
super().__init__()
93+
94+
self.tiger = stats.tiger
95+
self._stats: Stats = stats
96+
self._stop_event = threading.Event()
97+
98+
self.daemon = True # Exit process if main thread exits unexpectedly
99+
67100
def run(self) -> None:
68101
while not self._stop_event.wait(self.tiger.config["STATS_INTERVAL"]):
69-
self.compute_stats()
102+
self._stats.log()
70103

71104
def stop(self) -> None:
72105
self._stop_event.set()

tasktiger/tasktiger.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ def init(
198198
"BATCH_QUEUES": {},
199199
# How often to print stats.
200200
"STATS_INTERVAL": 60,
201+
# The function to call with the duration of each completed task.
202+
# This can be useful to measure the worker's utilisation %.
203+
# For example, the worker's utilisation over the last 30 minutes
204+
# can be obtained by dividing the sum of task durations reported
205+
# over the last 30 minutes by 30 minutes.
206+
"STATS_CALLBACK": None,
201207
# Single worker queues can reduce redis activity in some use cases
202208
# by locking at the queue level instead of just at the task or task
203209
# group level. These queues will only allow a single worker to

tasktiger/worker.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
)
5656
from .redis_semaphore import Semaphore
5757
from .runner import get_runner_class
58-
from .stats import StatsThread
58+
from .stats import Stats
5959
from .task import Task
6060
from .timeouts import JobTimeoutException
6161
from .utils import redis_glob_escape
@@ -107,7 +107,7 @@ def __init__(
107107
self._key = tiger._key
108108
self._did_work = True
109109
self._last_task_check = 0.0
110-
self.stats_thread: Optional[StatsThread] = None
110+
self.stats = Stats(self)
111111
self.id = str(uuid.uuid4())
112112

113113
if queues:
@@ -1008,13 +1008,11 @@ def _execute_task_group(
10081008
if not ready_tasks:
10091009
return True, []
10101010

1011-
if self.stats_thread:
1012-
self.stats_thread.report_task_start()
1011+
self.stats.report_task_start()
10131012
success = self._execute(
10141013
queue, ready_tasks, log, locks, queue_lock, all_task_ids
10151014
)
1016-
if self.stats_thread:
1017-
self.stats_thread.report_task_end()
1015+
self.stats.report_task_end()
10181016

10191017
for lock in locks:
10201018
try:
@@ -1280,9 +1278,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
12801278
self.log.warn("using old Redis version")
12811279

12821280
if self.config["STATS_INTERVAL"]:
1283-
stats_thread = StatsThread(self)
1284-
self.stats_thread = stats_thread
1285-
stats_thread.start()
1281+
self.stats.start_logging_thread()
12861282

12871283
# Queue any periodic tasks that are not queued yet.
12881284
self._queue_periodic_tasks()
@@ -1328,9 +1324,7 @@ def run(self, once: bool = False, force_once: bool = False) -> None:
13281324
raise
13291325

13301326
finally:
1331-
if self.stats_thread:
1332-
self.stats_thread.stop()
1333-
self.stats_thread = None
1327+
self.stats.stop_logging_thread()
13341328

13351329
# Free up Redis connection
13361330
if self._pubsub:

0 commit comments

Comments
 (0)