Skip to content

Commit a912126

Browse files
authored
Merge pull request #594 from rails/concurrency-on-conflict-discard
Add support for a `discard` option in concurrency controls
2 parents d036fbf + d23be3c commit a912126

File tree

8 files changed

+185
-19
lines changed

8 files changed

+185
-19
lines changed

README.md

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -433,22 +433,31 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c
433433

434434
## Concurrency controls
435435

436-
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.
436+
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued.
437437

438438
```ruby
439439
class MyJob < ApplicationJob
440-
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group
440+
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour
441441
442442
# ...
443443
```
444444
- `key` is the only required parameter, and it can be a symbol, a string or a proc that receives the job arguments as parameters and will be used to identify the jobs that need to be limited together. If the proc returns an Active Record record, the key will be built from its class name and `id`.
445445
- `to` is `1` by default.
446446
- `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well.
447447
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
448+
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
449+
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
450+
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.
448451

449452
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
450453

451-
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
454+
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded.
455+
456+
Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than `duration` are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting, or about the jobs that would get discarded while the semaphore is closed.
457+
458+
It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
459+
460+
When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
452461

453462

454463
For example:
@@ -483,7 +492,7 @@ In this case, if we have a `Box::MovePostingsByContactToDesignatedBoxJob` job en
483492

484493
Note that the `duration` setting depends indirectly on the value for `concurrency_maintenance_interval` that you set for your dispatcher(s), as that'd be the frequency with which blocked jobs are checked and unblocked (at which point, only one job per concurrency key, at most, is unblocked). In general, you should set `duration` in a way that all your jobs would finish well under that duration and think of the concurrency maintenance task as a failsafe in case something goes wrong.
485494

486-
Jobs are unblocked in order of priority but queue order is not taken into account for unblocking jobs. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.
495+
Jobs are unblocked in order of priority but **queue order is not taken into account for unblocking jobs**. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.
487496

488497
Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.
489498

@@ -510,6 +519,11 @@ production:
510519

511520
Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).
512521

522+
523+
In addition, mixing concurrency controls with **bulk enqueuing** (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing.
524+
525+
When jobs that have concurrency controls and `on_conflict: :discard` are enqueued in bulk, the ones that fail to be enqueued and are discarded would have `successfully_enqueued` set to `false`. The total count of jobs enqueued returned by `perform_all_later` will exclude these jobs as expected.
526+
513527
## Failed jobs and retries
514528

515529
Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:

app/models/solid_queue/job.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ def enqueue(active_job, scheduled_at: Time.current)
2929
active_job.scheduled_at = scheduled_at
3030

3131
create_from_active_job(active_job).tap do |enqueued_job|
32-
active_job.provider_job_id = enqueued_job.id
32+
active_job.provider_job_id = enqueued_job.id if enqueued_job.persisted?
3333
end
3434
end
3535

@@ -49,7 +49,7 @@ def create_from_active_job(active_job)
4949
def create_all_from_active_jobs(active_jobs)
5050
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
5151
insert_all(job_rows)
52-
where(active_job_id: active_jobs.map(&:job_id))
52+
where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc)
5353
end
5454

5555
def attributes_from_active_job(active_job)

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ def blocked?
3434
end
3535

3636
private
37+
def concurrency_on_conflict
38+
job_class.concurrency_on_conflict.to_s.inquiry
39+
end
40+
3741
def acquire_concurrency_lock
3842
return true unless concurrency_limited?
3943

@@ -46,6 +50,14 @@ def release_concurrency_lock
4650
Semaphore.signal(self)
4751
end
4852

53+
def handle_concurrency_conflict
54+
if concurrency_on_conflict.discard?
55+
destroy
56+
else
57+
block
58+
end
59+
end
60+
4961
def block
5062
BlockedExecution.create_or_find_by!(job_id: id)
5163
end

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def prepare_for_execution
6767
def dispatch
6868
if acquire_concurrency_lock then ready
6969
else
70-
block
70+
handle_concurrency_conflict
7171
end
7272
end
7373

lib/active_job/concurrency_controls.rb

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,24 @@ module ConcurrencyControls
55
extend ActiveSupport::Concern
66

77
DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name }
8+
CONCURRENCY_ON_CONFLICT_BEHAVIOUR = %i[ block discard ]
89

910
included do
1011
class_attribute :concurrency_key, instance_accessor: false
1112
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false
1213

1314
class_attribute :concurrency_limit
1415
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
16+
class_attribute :concurrency_on_conflict, default: :block
1517
end
1618

1719
class_methods do
18-
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
20+
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
1921
self.concurrency_key = key
2022
self.concurrency_limit = to
2123
self.concurrency_group = group
2224
self.concurrency_duration = duration
25+
self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block
2326
end
2427
end
2528

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class DiscardableUpdateResultJob < UpdateResultJob
2+
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
3+
end

test/integration/concurrency_controls_test.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,59 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
196196
end
197197
end
198198

199+
test "discard jobs when concurrency limit is reached with on_conflict: :discard" do
200+
job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 3)
201+
# should be discarded due to concurrency limit
202+
job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2")
203+
# should also be discarded
204+
job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3")
205+
206+
wait_for_jobs_to_finish_for(5.seconds)
207+
assert_no_unfinished_jobs
208+
209+
# Only the first job did something
210+
assert_stored_sequence(@result, [ "1" ])
211+
212+
# All jobs have finished and have no blocked executions
213+
jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id))
214+
assert_equal 1, jobs.count
215+
216+
assert_equal job1.provider_job_id, jobs.first.id
217+
assert_nil job2.provider_job_id
218+
assert_nil job3.provider_job_id
219+
end
220+
221+
test "discard on conflict across different concurrency keys" do
222+
another_result = JobResult.create!(queue_name: "default", status: "seq: ")
223+
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
224+
DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2)
225+
DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded
226+
DiscardableUpdateResultJob.perform_later(another_result, name: "4") # Should be discarded
227+
228+
wait_for_jobs_to_finish_for(5.seconds)
229+
assert_no_unfinished_jobs
230+
231+
# Only the first 2 jobs did something
232+
assert_stored_sequence(@result, [ "1" ])
233+
assert_stored_sequence(another_result, [ "2" ])
234+
end
235+
236+
test "discard on conflict and release semaphore" do
237+
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1)
238+
# will be discarded
239+
DiscardableUpdateResultJob.perform_later(@result, name: "2")
240+
241+
wait_for_jobs_to_finish_for(5.seconds)
242+
assert_no_unfinished_jobs
243+
244+
# Enqueue another job that shouldn't be discarded or blocked
245+
DiscardableUpdateResultJob.perform_later(@result, name: "3")
246+
wait_for_jobs_to_finish_for(5.seconds)
247+
assert_no_unfinished_jobs
248+
249+
assert_stored_sequence(@result, [ "1", "3" ])
250+
end
251+
199252
private
200253
def assert_stored_sequence(result, *sequences)
201254
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }

0 commit comments

Comments
 (0)