Skip to content

Commit d23be3c

Browse files
committed
Change the approach, destroying the job when it can't be enqueued
Otherwise we'd get inconsistent behaviour with other adapters like Sidekiq, where only jobs that are actually enqueued get assigned a provider_job_id. Turns out, issuing a DELETE query for a record that got inserted within a transaction works perfectly, even with bulk INSERT, so let's just avoid creating those jobs altogether.
1 parent ddfe719 commit d23be3c

File tree

7 files changed

+45
-215
lines changed

7 files changed

+45
-215
lines changed

README.md

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c
428428

429429
## Concurrency controls
430430

431-
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued, they'll silently complete.
431+
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued.
432432

433433
```ruby
434434
class MyJob < ApplicationJob
@@ -442,11 +442,17 @@ class MyJob < ApplicationJob
442442
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
443443
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
444444
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
445-
- `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed. Additionally, Active Jobs that are discarded will have `successfully_enqueued` set to `false`.
445+
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.
446446

447447
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
448448

449-
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. In a similar way, when using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
449+
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded.
450+
451+
Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than `duration` are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting, or about the jobs that would get discarded while the semaphore is closed.
452+
453+
It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
454+
455+
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
450456

451457

452458
For example:
@@ -481,7 +487,7 @@ In this case, if we have a `Box::MovePostingsByContactToDesignatedBoxJob` job en
481487

482488
Note that the `duration` setting depends indirectly on the value for `concurrency_maintenance_interval` that you set for your dispatcher(s), as that'd be the frequency with which blocked jobs are checked and unblocked (at which point, only one job per concurrency key, at most, is unblocked). In general, you should set `duration` in a way that all your jobs would finish well under that duration and think of the concurrency maintenance task as a failsafe in case something goes wrong.
483489

484-
Jobs are unblocked in order of priority but queue order is not taken into account for unblocking jobs. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.
490+
Jobs are unblocked in order of priority but **queue order is not taken into account for unblocking jobs**. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.
485491

486492
Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.
487493

@@ -509,7 +515,9 @@ production:
509515
Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).
510516

511517

512-
In addition, mixing concurrency controls with bulk enqueuing (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing.
518+
In addition, mixing concurrency controls with **bulk enqueuing** (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing.
519+
520+
When jobs that have concurrency controls and `on_conflict: :discard` are enqueued in bulk, the ones that fail to be enqueued and are discarded would have `successfully_enqueued` set to `false`. The total count of jobs enqueued returned by `perform_all_later` will exclude these jobs as expected.
513521

514522
## Failed jobs and retries
515523

app/models/solid_queue/job.rb

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,31 +10,26 @@ class EnqueueError < StandardError; end
1010

1111
class << self
1212
def enqueue_all(active_jobs)
13-
enqueued_jobs_count = 0
13+
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
1414

1515
transaction do
1616
jobs = create_all_from_active_jobs(active_jobs)
17-
prepare_all_for_execution(jobs)
18-
jobs_by_active_job_id = jobs.index_by(&:active_job_id)
19-
20-
active_jobs.each do |active_job|
21-
job = jobs_by_active_job_id[active_job.job_id]
22-
23-
active_job.provider_job_id = job&.id
24-
active_job.enqueue_error = job&.enqueue_error
25-
active_job.successfully_enqueued = job.present? && job.enqueue_error.nil?
26-
enqueued_jobs_count += 1 if active_job.successfully_enqueued?
17+
prepare_all_for_execution(jobs).tap do |enqueued_jobs|
18+
enqueued_jobs.each do |enqueued_job|
19+
active_jobs_by_job_id[enqueued_job.active_job_id].provider_job_id = enqueued_job.id
20+
active_jobs_by_job_id[enqueued_job.active_job_id].successfully_enqueued = true
21+
end
2722
end
2823
end
2924

30-
enqueued_jobs_count
25+
active_jobs.count(&:successfully_enqueued?)
3126
end
3227

3328
def enqueue(active_job, scheduled_at: Time.current)
3429
active_job.scheduled_at = scheduled_at
3530

3631
create_from_active_job(active_job).tap do |enqueued_job|
37-
active_job.provider_job_id = enqueued_job.id
32+
active_job.provider_job_id = enqueued_job.id if enqueued_job.persisted?
3833
end
3934
end
4035

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ def release_concurrency_lock
5252

5353
def handle_concurrency_conflict
5454
if concurrency_on_conflict.discard?
55-
finished!
55+
destroy
5656
else
5757
block
5858
end

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ module Executable
1313

1414
after_create :prepare_for_execution
1515

16-
attr_accessor :enqueue_error
17-
1816
scope :finished, -> { where.not(finished_at: nil) }
1917
end
2018

@@ -39,13 +37,7 @@ def dispatch_all_at_once(jobs)
3937
end
4038

4139
def dispatch_all_one_by_one(jobs)
42-
jobs.each do |job|
43-
begin
44-
job.dispatch
45-
rescue EnqueueError => e
46-
job.enqueue_error = e
47-
end
48-
end
40+
jobs.each(&:dispatch)
4941
end
5042

5143
def successfully_dispatched(jobs)

test/integration/concurrency_controls_test.rb

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -197,11 +197,10 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
197197
end
198198

199199
test "discard jobs when concurrency limit is reached with on_conflict: :discard" do
200-
# Enqueue first job - should be executed
201-
job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
202-
# Enqueue second job - should be discarded due to concurrency limit
200+
job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 3)
201+
# should be discarded due to concurrency limit
203202
job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2")
204-
# Enqueue third job - should also be discarded
203+
# should also be discarded
205204
job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3")
206205

207206
wait_for_jobs_to_finish_for(5.seconds)
@@ -212,12 +211,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
212211

213212
# All jobs have finished and have no blocked executions
214213
jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id))
215-
assert_equal 3, jobs.count
214+
assert_equal 1, jobs.count
216215

217-
jobs.each do |job|
218-
assert job.finished?
219-
assert_nil job.blocked_execution
220-
end
216+
assert_equal job1.provider_job_id, jobs.first.id
217+
assert_nil job2.provider_job_id
218+
assert_nil job3.provider_job_id
221219
end
222220

223221
test "discard on conflict across different concurrency keys" do

test/models/solid_queue/job_test.rb

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,10 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob
176176

177177
travel_to 10.minutes.from_now
178178

179-
count = SolidQueue::ScheduledExecution.dispatch_next_batch(10)
180-
181-
assert_not scheduled_job.reload.scheduled?
182-
assert_not scheduled_job.ready?
183-
assert scheduled_job.finished?
179+
# The scheduled job is not enqueued because it conflicts with
180+
# the first one and is discarded
181+
assert_equal 0, SolidQueue::ScheduledExecution.dispatch_next_batch(10)
182+
assert_nil SolidQueue::Job.find_by(id: scheduled_job.id)
184183
end
185184

186185
test "enqueue jobs in bulk" do
@@ -212,16 +211,20 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob
212211
NonOverlappingJob.new(@result),
213212
AddToBufferJob.new(6).set(wait: 2.minutes),
214213
NonOverlappingJob.new(@result),
215-
DiscardableNonOverlappingJob.new(@result)
214+
DiscardableNonOverlappingJob.new(@result) # this one won't be enqueued
216215
]
216+
not_enqueued = active_jobs.last
217217

218-
assert_job_counts(ready: 3, scheduled: 1, blocked: 1, discarded: 1) do
218+
assert_job_counts(ready: 3, scheduled: 1, blocked: 1) do
219219
ActiveJob.perform_all_later(active_jobs)
220220
end
221221

222-
jobs = SolidQueue::Job.last(6)
223-
assert_equal active_jobs.map(&:provider_job_id).sort, jobs.pluck(:id).sort
224-
assert active_jobs.all?(&:successfully_enqueued?)
222+
jobs = SolidQueue::Job.last(5)
223+
assert_equal active_jobs.without(not_enqueued).map(&:provider_job_id).sort, jobs.pluck(:id).sort
224+
assert active_jobs.without(not_enqueued).all?(&:successfully_enqueued?)
225+
226+
assert_nil not_enqueued.provider_job_id
227+
assert_not not_enqueued.successfully_enqueued?
225228
end
226229

227230
test "discard ready job" do
@@ -334,13 +337,6 @@ class DiscardableNonOverlappingGroupedJob2 < NonOverlappingJob
334337
assert_raises SolidQueue::Job::EnqueueError do
335338
AddToBufferJob.perform_later(1)
336339
end
337-
338-
# #perform_later with a block doesn't raise ActiveJob::EnqueueError,
339-
# and instead set's successfully_enqueued? to false
340-
assert_not AddToBufferJob.perform_later(1) do |active_job|
341-
assert_not active_job.successfully_enqueued?
342-
assert_equal SolidQueue::Job::EnqueueError, active_job.enqueue_error.class
343-
end
344340
end
345341

346342
test "enqueue successfully inside a rolled-back transaction in the app DB" do
@@ -376,13 +372,8 @@ def assert_blocked(&block)
376372
assert SolidQueue::Job.last.blocked?
377373
end
378374

379-
def assert_discarded(&block)
380-
assert_job_counts(discarded: 1, &block)
381-
assert SolidQueue::Job.last.finished?
382-
end
383-
384-
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, discarded: 0, &block)
385-
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked + discarded) do
375+
def assert_job_counts(ready: 0, scheduled: 0, blocked: 0, &block)
376+
assert_difference -> { SolidQueue::Job.count }, +(ready + scheduled + blocked) do
386377
assert_difference -> { SolidQueue::ReadyExecution.count }, +ready do
387378
assert_difference -> { SolidQueue::ScheduledExecution.count }, +scheduled do
388379
assert_difference -> { SolidQueue::BlockedExecution.count }, +blocked, &block

0 commit comments

Comments
 (0)