Skip to content

Add support for a discard option in concurrency controls #594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jul 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -428,22 +428,31 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c

## Concurrency controls

Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs are never discarded or lost, only blocked.
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued.

```ruby
class MyJob < ApplicationJob
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour

# ...
```
- `key` is the only required parameter, and it can be a symbol, a string or a proc that receives the job arguments as parameters and will be used to identify the jobs that need to be limited together. If the proc returns an Active Record record, the key will be built from its class name and `id`.
- `to` is `1` by default.
- `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well.
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
- `:discard`: the job is discarded. When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.

When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).

The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded.

Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than `duration` are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting, or about the jobs that would get discarded while the semaphore is closed.

It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.

When using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.


For example:
Expand Down Expand Up @@ -478,7 +487,7 @@ In this case, if we have a `Box::MovePostingsByContactToDesignatedBoxJob` job en

Note that the `duration` setting depends indirectly on the value for `concurrency_maintenance_interval` that you set for your dispatcher(s), as that'd be the frequency with which blocked jobs are checked and unblocked (at which point, only one job per concurrency key, at most, is unblocked). In general, you should set `duration` in a way that all your jobs would finish well under that duration and think of the concurrency maintenance task as a failsafe in case something goes wrong.

Jobs are unblocked in order of priority but queue order is not taken into account for unblocking jobs. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.
Jobs are unblocked in order of priority but **queue order is not taken into account for unblocking jobs**. That means that if you have a group of jobs that share a concurrency group but are in different queues, or jobs of the same class that you enqueue in different queues, the queue order you set for a worker is not taken into account when unblocking blocked ones. The reason is that a job that runs unblocks the next one, and the job itself doesn't know about a particular worker's queue order (you could even have different workers with different queue orders), it can only know about priority. Once blocked jobs are unblocked and available for polling, they'll be picked up by a worker following its queue order.

Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.

Expand All @@ -505,6 +514,11 @@ production:

Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).


In addition, mixing concurrency controls with **bulk enqueuing** (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing.

When jobs that have concurrency controls and `on_conflict: :discard` are enqueued in bulk, the ones that fail to be enqueued and are discarded would have `successfully_enqueued` set to `false`. The total count of jobs enqueued returned by `perform_all_later` will exclude these jobs as expected.

## Failed jobs and retries

Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:
Expand Down
4 changes: 2 additions & 2 deletions app/models/solid_queue/job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def enqueue(active_job, scheduled_at: Time.current)
active_job.scheduled_at = scheduled_at

create_from_active_job(active_job).tap do |enqueued_job|
active_job.provider_job_id = enqueued_job.id
active_job.provider_job_id = enqueued_job.id if enqueued_job.persisted?
end
end

Expand All @@ -49,7 +49,7 @@ def create_from_active_job(active_job)
def create_all_from_active_jobs(active_jobs)
job_rows = active_jobs.map { |job| attributes_from_active_job(job) }
insert_all(job_rows)
where(active_job_id: active_jobs.map(&:job_id))
where(active_job_id: active_jobs.map(&:job_id)).order(id: :asc)
end

def attributes_from_active_job(active_job)
Expand Down
12 changes: 12 additions & 0 deletions app/models/solid_queue/job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def blocked?
end

private
def concurrency_on_conflict
job_class.concurrency_on_conflict.to_s.inquiry
end

def acquire_concurrency_lock
return true unless concurrency_limited?

Expand All @@ -46,6 +50,14 @@ def release_concurrency_lock
Semaphore.signal(self)
end

def handle_concurrency_conflict
if concurrency_on_conflict.discard?
destroy
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

destroy! maybe?

Copy link

@marelons1337 marelons1337 Jul 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parent method is not defined with a bang, so should this one call a bang operation?

else
block
end
end

def block
BlockedExecution.create_or_find_by!(job_id: id)
end
Expand Down
2 changes: 1 addition & 1 deletion app/models/solid_queue/job/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def prepare_for_execution
def dispatch
if acquire_concurrency_lock then ready
else
block
handle_concurrency_conflict
end
end

Expand Down
5 changes: 4 additions & 1 deletion lib/active_job/concurrency_controls.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,24 @@ module ConcurrencyControls
extend ActiveSupport::Concern

DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name }
CONCURRENCY_ON_CONFLICT_BEHAVIOUR = %i[ block discard ]

included do
class_attribute :concurrency_key, instance_accessor: false
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false

class_attribute :concurrency_limit
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
class_attribute :concurrency_on_conflict, default: :block
end

class_methods do
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
self.concurrency_key = key
self.concurrency_limit = to
self.concurrency_group = group
self.concurrency_duration = duration
self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block
end
end

Expand Down
3 changes: 3 additions & 0 deletions test/dummy/app/jobs/discardable_update_result_job.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class DiscardableUpdateResultJob < UpdateResultJob
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
end
53 changes: 53 additions & 0 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,59 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end
end

test "discard jobs when concurrency limit is reached with on_conflict: :discard" do
job1 = DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 3)
# should be discarded due to concurrency limit
job2 = DiscardableUpdateResultJob.perform_later(@result, name: "2")
# should also be discarded
job3 = DiscardableUpdateResultJob.perform_later(@result, name: "3")

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# Only the first job did something
assert_stored_sequence(@result, [ "1" ])

# All jobs have finished and have no blocked executions
jobs = SolidQueue::Job.where(active_job_id: [ job1, job2, job3 ].map(&:job_id))
assert_equal 1, jobs.count

assert_equal job1.provider_job_id, jobs.first.id
assert_nil job2.provider_job_id
assert_nil job3.provider_job_id
end

test "discard on conflict across different concurrency keys" do
another_result = JobResult.create!(queue_name: "default", status: "seq: ")
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2)
DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded
DiscardableUpdateResultJob.perform_later(another_result, name: "4") # Should be discarded

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# Only the first 2 jobs did something
assert_stored_sequence(@result, [ "1" ])
assert_stored_sequence(another_result, [ "2" ])
end

test "discard on conflict and release semaphore" do
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.1)
# will be discarded
DiscardableUpdateResultJob.perform_later(@result, name: "2")

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# Enqueue another job that shouldn't be discarded or blocked
DiscardableUpdateResultJob.perform_later(@result, name: "3")
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

assert_stored_sequence(@result, [ "1", "3" ])
end

private
def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
Expand Down
Loading