Skip to content

Make concurrency tests less flaky #605

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
class SequentialUpdateResultJob < UpdateResultJob
class NonOverlappingUpdateResultJob < UpdateResultJob
limits_concurrency key: ->(job_result, **) { job_result }
end
1 change: 1 addition & 0 deletions test/dummy/app/jobs/update_result_job.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
class UpdateResultJob < ApplicationJob
def perform(job_result, name:, pause: nil, exception: nil)
job_result.status += " + " unless job_result.status.blank?
job_result.status += "s#{name}"

sleep(pause) if pause
Expand Down
56 changes: 31 additions & 25 deletions test/integration/concurrency_controls_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
self.use_transactional_tests = false

setup do
@result = JobResult.create!(queue_name: "default", status: "seq: ")
@result = JobResult.create!(queue_name: "default", status: "")

default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 }
dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 }
Expand All @@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
terminate_process(@pid) if process_exists?(@pid)
end

test "run several conflicting jobs over the same record sequentially" do
test "run several conflicting jobs over the same record without overlapping" do
("A".."F").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
end

("G".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
end

wait_for_jobs_to_finish_for(5.seconds)
Expand All @@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds)

("A".."F").each_with_index do |name, i|
SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds)
end

("G".."K").each_with_index do |name, i|
SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name)
end

wait_for_jobs_to_finish_for(5.seconds)
Expand Down Expand Up @@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
test "run several jobs over the same record sequentially, with some of them failing" do
("A".."F").each_with_index do |name, i|
# A, C, E will fail, for i= 0, 2, 4
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?))
end

("G".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
end

wait_for_jobs_to_finish_for(5.seconds)
Expand All @@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase

test "rely on dispatcher to unblock blocked executions with an available semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")

wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs
Expand All @@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
# Now enqueue more jobs under that same key. They'll be all locked
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
("B".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
end
end

Expand All @@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
assert_stored_sequence @result, ("A".."K").to_a
end

test "rely on dispatcher to unblock blocked executions with an expired semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A")
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

Expand All @@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
# Now enqueue more jobs under that same key. They'll be all locked
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
("B".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
NonOverlappingUpdateResultJob.perform_later(@result, name: name)
end
end

Expand All @@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
wait_for_jobs_to_finish_for(5.seconds)
assert_no_unfinished_jobs

# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
assert_stored_sequence @result, ("A".."K").to_a
end

test "don't block claimed executions that get released" do
SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds)
job = SolidQueue::Job.last

sleep(0.2)
Expand All @@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2

ActiveRecord::Base.transaction do
SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
SequentialUpdateResultJob.perform_later(@result, name: "B")
NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds)
NonOverlappingUpdateResultJob.perform_later(@result, name: "B")

begin
assert_equal 2, SolidQueue::Job.count
Expand Down Expand Up @@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end

test "discard on conflict across different concurrency keys" do
another_result = JobResult.create!(queue_name: "default", status: "seq: ")
another_result = JobResult.create!(queue_name: "default", status: "")
DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2)
DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2)
DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded
Expand All @@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
DiscardableUpdateResultJob.perform_later(@result, name: "2")

wait_for_jobs_to_finish_for(5.seconds)
wait_for_semaphores_to_be_released_for(2.seconds)

assert_no_unfinished_jobs

# Enqueue another job that shouldn't be discarded or blocked
Expand All @@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
end

private
def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join }
def assert_stored_sequence(result, sequence)
expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join
skip_active_record_query_cache do
assert_includes expected, result.reload.status
assert_equal expected, result.reload.status.split(" + ").sort.join
end
end

def wait_for_semaphores_to_be_released_for(timeout)
wait_while_with_timeout(timeout) do
skip_active_record_query_cache do
SolidQueue::Semaphore.available.invert_where.any?
end
end
end
end
8 changes: 4 additions & 4 deletions test/integration/instrumentation_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class InstrumentationTest < ActiveSupport::TestCase
5.times { AddToBufferJob.perform_later("A") }
# 1 ready + 3 blocked
result = JobResult.create!
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }

events = subscribed("discard_all.solid_queue") do
SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all)
Expand All @@ -261,7 +261,7 @@ class InstrumentationTest < ActiveSupport::TestCase
test "unblocking job emits release_blocked event" do
result = JobResult.create!
# 1 ready, 2 blocked
3.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }

# Simulate expiry of the concurrency locks
travel_to 3.days.from_now
Expand All @@ -283,11 +283,11 @@ class InstrumentationTest < ActiveSupport::TestCase
test "unblocking jobs in bulk emits release_many_blocked event" do
result = JobResult.create!
# 1 ready, 3 blocked
4.times { SequentialUpdateResultJob.perform_later(result, name: "A") }
4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") }

# 1 ready, 2 blocked
result = JobResult.create!
3.times { SequentialUpdateResultJob.perform_later(result, name: "B") }
3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "B") }

# Simulate expiry of the concurrency locks
travel_to 3.days.from_now
Expand Down