Skip to content

Commit 41c2d1f

Browse files
dereuromarkclaude
andcommitted
Add queue job lifecycle events for monitoring integration
Add events at key lifecycle points to enable integration with monitoring tools like Sentry: - Queue.Job.created: Fired when a job is added to the queue (producer) - Queue.Job.started: Fired when a worker begins processing a job (consumer) - Queue.Job.completed: Fired when a job finishes successfully - Queue.Job.failed: Fired on every job failure (not just when exhausted) These events provide the necessary hooks for implementing Sentry's queue monitoring feature or similar monitoring tools. The job entity included in each event provides all data needed for tracing (job ID, task name, payload, attempts, timestamps). Uses $this->dispatchEvent() for Table class instead of EventManager::instance(). Refs: LordSimal/cakephp-sentry#17 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent cb2221a commit 41c2d1f

File tree

5 files changed

+224
-2
lines changed

5 files changed

+224
-2
lines changed

docs/sections/misc.md

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,64 @@ This includes also failed ones if not filtered further using `where()` condition
5151

5252
## Events
5353
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
54+
These events are useful for monitoring, logging, and integrating with external services like Sentry.
55+
56+
### Queue.Job.created
57+
This event is triggered when a new job is added to the queue (producer side).
58+
59+
```php
60+
use Cake\Event\EventInterface;
61+
use Cake\Event\EventManager;
62+
63+
EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
64+
$job = $event->getData('job');
65+
// Track job creation for monitoring
66+
});
67+
```
68+
69+
Event data:
70+
- `job`: The `QueuedJob` entity that was created
71+
72+
### Queue.Job.started
73+
This event is triggered when a worker begins processing a job (consumer side).
74+
75+
```php
76+
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
77+
$job = $event->getData('job');
78+
// Start tracing/monitoring span
79+
});
80+
```
81+
82+
Event data:
83+
- `job`: The `QueuedJob` entity being processed
84+
85+
### Queue.Job.completed
86+
This event is triggered when a job finishes successfully.
87+
88+
```php
89+
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
90+
$job = $event->getData('job');
91+
// Mark trace as successful
92+
});
93+
```
94+
95+
Event data:
96+
- `job`: The `QueuedJob` entity that completed
97+
98+
### Queue.Job.failed
99+
This event is triggered when a job fails (on every failure attempt).
100+
101+
```php
102+
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
103+
$job = $event->getData('job');
104+
$failureMessage = $event->getData('failureMessage');
105+
// Mark trace as failed, log error
106+
});
107+
```
108+
109+
Event data:
110+
- `job`: The `QueuedJob` entity that failed
111+
- `failureMessage`: The error message from the failure
54112

55113
### Queue.Job.maxAttemptsExhausted
56114
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
@@ -81,10 +139,21 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
81139
});
82140
```
83141

84-
The event data contains:
142+
Event data:
85143
- `job`: The `QueuedJob` entity that failed
86144
- `failureMessage`: The error message from the last failure
87145

146+
### Monitoring Integration (Sentry, etc.)
147+
148+
These events enable integration with monitoring tools like Sentry's queue monitoring feature.
149+
The job entity provides all necessary data for tracing:
150+
151+
- `$job->id` - Message identifier (`messaging.message.id`)
152+
- `$job->job_task` - Queue/topic name (`messaging.destination.name`)
153+
- `$job->data` - Payload for calculating message size (`messaging.message.body.size`)
154+
- `$job->attempts` - Retry count (`messaging.message.retry.count`)
155+
- `$job->created`, `$job->notbefore`, `$job->fetched` - For calculating receive latency (`messaging.message.receive.latency`)
156+
88157
## Notes
89158

90159
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).

src/Model/Table/QueuedJobsTable.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,13 @@ public function createJob(string $jobTask, array|object|null $data = null, array
216216
}
217217

218218
$queuedJob = $this->newEntity($queuedJob);
219+
$queuedJob = $this->saveOrFail($queuedJob);
219220

220-
return $this->saveOrFail($queuedJob);
221+
$this->dispatchEvent('Queue.Job.created', [
222+
'job' => $queuedJob,
223+
]);
224+
225+
return $queuedJob;
221226
}
222227

223228
/**

src/Queue/Processor.php

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,11 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
213213
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
214214
$taskName = $queuedJob->job_task;
215215

216+
$event = new Event('Queue.Job.started', $this, [
217+
'job' => $queuedJob,
218+
]);
219+
EventManager::instance()->dispatch($event);
220+
216221
$return = $failureMessage = null;
217222
try {
218223
$this->time = time();
@@ -242,6 +247,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
242247
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
243248
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
244249

250+
$event = new Event('Queue.Job.failed', $this, [
251+
'job' => $queuedJob,
252+
'failureMessage' => $failureMessage,
253+
]);
254+
EventManager::instance()->dispatch($event);
255+
245256
// Dispatch event when job has exhausted all retries
246257
if ($failedStatus === 'aborted') {
247258
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
@@ -255,6 +266,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
255266
}
256267

257268
$this->QueuedJobs->markJobDone($queuedJob);
269+
270+
$event = new Event('Queue.Job.completed', $this, [
271+
'job' => $queuedJob,
272+
]);
273+
EventManager::instance()->dispatch($event);
274+
258275
$this->io->out('Job Finished.');
259276
$this->currentJob = null;
260277
}

tests/TestCase/Model/Table/QueuedJobsTableTest.php

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
use Cake\Core\Configure;
1313
use Cake\Datasource\ConnectionManager;
14+
use Cake\Event\EventList;
15+
use Cake\Event\EventManager;
1416
use Cake\I18n\DateTime;
1517
use Cake\ORM\TableRegistry;
1618
use Cake\TestSuite\TestCase;
@@ -752,6 +754,23 @@ public function testGetStats() {
752754
$this->assertWithinRange(7200, (int)$queuedJob->fetchdelay, 1);
753755
}
754756

757+
/**
758+
* Test that Queue.Job.created event is fired when a job is created
759+
*
760+
* @return void
761+
*/
762+
public function testJobCreatedEvent() {
763+
// Set up event tracking
764+
$eventList = new EventList();
765+
EventManager::instance()->setEventList($eventList);
766+
767+
// Create a job
768+
$job = $this->QueuedJobs->createJob('Queue.Example', ['test' => 'data']);
769+
770+
// Check that the created event was dispatched
771+
$this->assertEventFired('Queue.Job.created');
772+
}
773+
755774
/**
756775
* Helper method for skipping tests that need a real connection.
757776
*

tests/TestCase/Queue/ProcessorTest.php

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use Queue\Model\Entity\QueuedJob;
1616
use Queue\Model\Table\QueuedJobsTable;
1717
use Queue\Queue\Processor;
18+
use Queue\Queue\Task\ExampleTask;
1819
use Queue\Queue\Task\RetryExampleTask;
1920
use ReflectionClass;
2021
use RuntimeException;
@@ -286,6 +287,117 @@ public function testWorkerTimeoutHandlingIntegration() {
286287
}
287288
}
288289

290+
/**
291+
* Test that Queue.Job.started event is fired when job begins processing
292+
*
293+
* @return void
294+
*/
295+
public function testJobStartedEvent() {
296+
// Set up event tracking
297+
$eventList = new EventList();
298+
EventManager::instance()->setEventList($eventList);
299+
300+
// Create a job
301+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
302+
$job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]);
303+
304+
// Create processor with mock task
305+
$out = new ConsoleOutput();
306+
$err = new ConsoleOutput();
307+
$processor = $this->getMockBuilder(Processor::class)
308+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
309+
->onlyMethods(['loadTask'])
310+
->getMock();
311+
312+
// Create a mock task that succeeds (run method is void, so no return)
313+
$mockTask = $this->getMockBuilder(ExampleTask::class)
314+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
315+
->onlyMethods(['run'])
316+
->getMock();
317+
318+
$processor->method('loadTask')->willReturn($mockTask);
319+
320+
// Run the job
321+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
322+
323+
// Check that the started event was dispatched
324+
$this->assertEventFired('Queue.Job.started');
325+
}
326+
327+
/**
328+
* Test that Queue.Job.completed event is fired when job finishes successfully
329+
*
330+
* @return void
331+
*/
332+
public function testJobCompletedEvent() {
333+
// Set up event tracking
334+
$eventList = new EventList();
335+
EventManager::instance()->setEventList($eventList);
336+
337+
// Create a job
338+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
339+
$job = $QueuedJobs->createJob('Queue.Example', [], ['priority' => 1]);
340+
341+
// Create processor with mock task
342+
$out = new ConsoleOutput();
343+
$err = new ConsoleOutput();
344+
$processor = $this->getMockBuilder(Processor::class)
345+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
346+
->onlyMethods(['loadTask'])
347+
->getMock();
348+
349+
// Create a mock task that succeeds (run method is void, so no return)
350+
$mockTask = $this->getMockBuilder(ExampleTask::class)
351+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
352+
->onlyMethods(['run'])
353+
->getMock();
354+
355+
$processor->method('loadTask')->willReturn($mockTask);
356+
357+
// Run the job
358+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
359+
360+
// Check that the completed event was dispatched
361+
$this->assertEventFired('Queue.Job.completed');
362+
}
363+
364+
/**
365+
* Test that Queue.Job.failed event is fired when job fails
366+
*
367+
* @return void
368+
*/
369+
public function testJobFailedEvent() {
370+
// Set up event tracking
371+
$eventList = new EventList();
372+
EventManager::instance()->setEventList($eventList);
373+
374+
// Create a job that will fail
375+
$QueuedJobs = $this->getTableLocator()->get('Queue.QueuedJobs');
376+
$job = $QueuedJobs->createJob('Queue.RetryExample', [], ['priority' => 1]);
377+
378+
// Create processor with mock task that fails
379+
$out = new ConsoleOutput();
380+
$err = new ConsoleOutput();
381+
$processor = $this->getMockBuilder(Processor::class)
382+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
383+
->onlyMethods(['loadTask'])
384+
->getMock();
385+
386+
$mockTask = $this->getMockBuilder(RetryExampleTask::class)
387+
->setConstructorArgs([new Io(new ConsoleIo($out, $err)), new NullLogger()])
388+
->onlyMethods(['run'])
389+
->getMock();
390+
$mockTask->method('run')->willThrowException(new RuntimeException('Task failed'));
391+
392+
$processor->method('loadTask')->willReturn($mockTask);
393+
394+
// Run the job (it will fail)
395+
$this->invokeMethod($processor, 'runJob', [$job, 'test-pid']);
396+
397+
// Check that the failed event was dispatched
398+
$this->assertEventFired('Queue.Job.failed');
399+
}
400+
289401
/**
290402
* Test setPhpTimeout with new config names
291403
*

0 commit comments

Comments
 (0)