Skip to content

Commit 7927e42

Browse files
dereuromarkclaude
andcommitted
Add queue job lifecycle events and Sentry integration
Add events at key lifecycle points to enable integration with monitoring tools like Sentry: - Queue.Job.created: Fired when a job is added to the queue (producer) - Queue.Job.started: Fired when a worker begins processing a job (consumer) - Queue.Job.completed: Fired when a job finishes successfully - Queue.Job.failed: Fired on every job failure (not just when exhausted) Additionally, provide built-in Sentry integration that can be enabled via `Queue.sentry` configuration. When enabled: - Creates queue.publish spans for producer side - Creates queue.process transactions for consumer side - Propagates trace context via job data - Includes all standard messaging attributes Refs: LordSimal/cakephp-sentry#17 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent cb2221a commit 7927e42

File tree

7 files changed

+523
-4
lines changed

7 files changed

+523
-4
lines changed

composer.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,14 @@
3434
"cakedc/cakephp-phpstan": "^4.0.0",
3535
"cakephp/bake": "^3.0.1",
3636
"cakephp/migrations": "^4.5.1",
37+
"dereuromark/cakephp-dto": "^2.1.0",
3738
"dereuromark/cakephp-ide-helper": "^2.0.0",
3839
"dereuromark/cakephp-templating": "^0.2.7",
3940
"dereuromark/cakephp-tools": "^3.0.0",
40-
"dereuromark/cakephp-dto": "^2.1.0",
4141
"fig-r/psr2r-sniffer": "dev-master",
4242
"friendsofcake/search": "^7.0.0",
43-
"phpunit/phpunit": "^10.5 || ^11.5 || ^12.1"
43+
"phpunit/phpunit": "^10.5 || ^11.5 || ^12.1",
44+
"sentry/sentry": "^4.18"
4445
},
4546
"suggest": {
4647
"dereuromark/cakephp-ide-helper": "For maximum IDE support, especially around createJob() usage.",

docs/sections/misc.md

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,64 @@ This includes also failed ones if not filtered further using `where()` condition
5151

5252
## Events
5353
The Queue plugin dispatches events to allow you to hook into the queue processing lifecycle.
54+
These events are useful for monitoring, logging, and integrating with external services like Sentry.
55+
56+
### Queue.Job.created
57+
This event is triggered when a new job is added to the queue (producer side).
58+
59+
```php
60+
use Cake\Event\EventInterface;
61+
use Cake\Event\EventManager;
62+
63+
EventManager::instance()->on('Queue.Job.created', function (EventInterface $event) {
64+
$job = $event->getData('job');
65+
// Track job creation for monitoring
66+
});
67+
```
68+
69+
Event data:
70+
- `job`: The `QueuedJob` entity that was created
71+
72+
### Queue.Job.started
73+
This event is triggered when a worker begins processing a job (consumer side).
74+
75+
```php
76+
EventManager::instance()->on('Queue.Job.started', function (EventInterface $event) {
77+
$job = $event->getData('job');
78+
// Start tracing/monitoring span
79+
});
80+
```
81+
82+
Event data:
83+
- `job`: The `QueuedJob` entity being processed
84+
85+
### Queue.Job.completed
86+
This event is triggered when a job finishes successfully.
87+
88+
```php
89+
EventManager::instance()->on('Queue.Job.completed', function (EventInterface $event) {
90+
$job = $event->getData('job');
91+
// Mark trace as successful
92+
});
93+
```
94+
95+
Event data:
96+
- `job`: The `QueuedJob` entity that completed
97+
98+
### Queue.Job.failed
99+
This event is triggered when a job fails (on every failure attempt).
100+
101+
```php
102+
EventManager::instance()->on('Queue.Job.failed', function (EventInterface $event) {
103+
$job = $event->getData('job');
104+
$failureMessage = $event->getData('failureMessage');
105+
// Mark trace as failed, log error
106+
});
107+
```
108+
109+
Event data:
110+
- `job`: The `QueuedJob` entity that failed
111+
- `failureMessage`: The error message from the failure
54112

55113
### Queue.Job.maxAttemptsExhausted
56114
This event is triggered when a job has failed and exhausted all of its configured retry attempts.
@@ -81,10 +139,52 @@ EventManager::instance()->on('Queue.Job.maxAttemptsExhausted', function (EventIn
81139
});
82140
```
83141

84-
The event data contains:
142+
Event data:
85143
- `job`: The `QueuedJob` entity that failed
86144
- `failureMessage`: The error message from the last failure
87145

146+
### Sentry Integration
147+
148+
The plugin provides built-in support for [Sentry's queue monitoring](https://docs.sentry.io/platforms/php/tracing/instrumentation/queues-module/) feature.
149+
When enabled, it automatically creates producer and consumer spans for queue jobs.
150+
151+
To enable Sentry integration, add to your configuration:
152+
153+
```php
154+
// In config/app.php or config/app_local.php
155+
'Queue' => [
156+
'sentry' => true,
157+
// ... other queue config
158+
],
159+
```
160+
161+
Requirements:
162+
- The `sentry/sentry` package must be installed
163+
- Sentry must be initialized in your application (e.g., via `lordsimal/cakephp-sentry`)
164+
165+
The integration automatically:
166+
- Creates `queue.publish` spans when jobs are created
167+
- Creates `queue.process` transactions when jobs are processed
168+
- Propagates trace context between producer and consumer via job data
169+
- Sets appropriate status (success/error) based on job outcome
170+
- Includes all standard messaging attributes:
171+
- `messaging.message.id` - Job ID
172+
- `messaging.destination.name` - Task name
173+
- `messaging.message.body.size` - Payload size in bytes
174+
- `messaging.message.retry.count` - Attempt count
175+
- `messaging.message.receive.latency` - Time from scheduled to fetched (ms)
176+
177+
### Using Events for Custom Monitoring
178+
179+
If you prefer to implement your own monitoring integration, you can use the events directly.
180+
The job entity provides all necessary data for tracing:
181+
182+
- `$job->id` - Message identifier (`messaging.message.id`)
183+
- `$job->job_task` - Queue/topic name (`messaging.destination.name`)
184+
- `$job->data` - Payload for calculating message size (`messaging.message.body.size`)
185+
- `$job->attempts` - Retry count (`messaging.message.retry.count`)
186+
- `$job->created`, `$job->notbefore`, `$job->fetched` - For calculating receive latency (`messaging.message.receive.latency`)
187+
88188
## Notes
89189

90190
`<TaskName>` is the complete class name without the Task suffix (e.g. Example or PluginName.Example).

src/Model/Table/QueuedJobsTable.php

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Queue\Model\Entity\QueuedJob;
1818
use Queue\Model\Filter\QueuedJobsCollection;
1919
use Queue\Queue\Config;
20+
use Queue\Queue\SentryIntegration;
2021
use Queue\Queue\TaskFinder;
2122
use Queue\Utility\Memory;
2223
use RuntimeException;
@@ -205,6 +206,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
205206
throw new InvalidArgumentException('Data must be `array|null`, implement `' . FromArrayToArrayInterface::class . '` or provide a `toArray()` method');
206207
}
207208

209+
// Add Sentry trace headers for trace propagation if available
210+
$traceHeaders = SentryIntegration::getTraceHeaders();
211+
if ($traceHeaders && is_array($data)) {
212+
$data = $traceHeaders + $data;
213+
} elseif ($traceHeaders) {
214+
$data = $traceHeaders;
215+
}
216+
208217
$queuedJob = [
209218
'job_task' => $this->jobTask($jobTask),
210219
'data' => $data,
@@ -216,8 +225,14 @@ public function createJob(string $jobTask, array|object|null $data = null, array
216225
}
217226

218227
$queuedJob = $this->newEntity($queuedJob);
228+
$queuedJob = $this->saveOrFail($queuedJob);
229+
230+
$this->dispatchEvent('Queue.Job.created', [
231+
'job' => $queuedJob,
232+
]);
233+
SentryIntegration::startProducerSpan($queuedJob);
219234

220-
return $this->saveOrFail($queuedJob);
235+
return $queuedJob;
221236
}
222237

223238
/**

src/Queue/Processor.php

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,12 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
213213
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id, $pid, false);
214214
$taskName = $queuedJob->job_task;
215215

216+
$event = new Event('Queue.Job.started', $this, [
217+
'job' => $queuedJob,
218+
]);
219+
EventManager::instance()->dispatch($event);
220+
SentryIntegration::startConsumerTransaction($queuedJob);
221+
216222
$return = $failureMessage = null;
217223
try {
218224
$this->time = time();
@@ -242,6 +248,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
242248
$this->log('job ' . $queuedJob->job_task . ', id ' . $queuedJob->id . ' failed and ' . $failedStatus, $pid);
243249
$this->io->out('Job did not finish, ' . $failedStatus . ' after try ' . $queuedJob->attempts . '.');
244250

251+
$event = new Event('Queue.Job.failed', $this, [
252+
'job' => $queuedJob,
253+
'failureMessage' => $failureMessage,
254+
]);
255+
EventManager::instance()->dispatch($event);
256+
SentryIntegration::finishConsumerFailure($queuedJob, $failureMessage);
257+
245258
// Dispatch event when job has exhausted all retries
246259
if ($failedStatus === 'aborted') {
247260
$event = new Event('Queue.Job.maxAttemptsExhausted', $this, [
@@ -255,6 +268,13 @@ protected function runJob(QueuedJob $queuedJob, string $pid): void {
255268
}
256269

257270
$this->QueuedJobs->markJobDone($queuedJob);
271+
272+
$event = new Event('Queue.Job.completed', $this, [
273+
'job' => $queuedJob,
274+
]);
275+
EventManager::instance()->dispatch($event);
276+
SentryIntegration::finishConsumerSuccess($queuedJob);
277+
258278
$this->io->out('Job Finished.');
259279
$this->currentJob = null;
260280
}

0 commit comments

Comments
 (0)