From 8ed8623b42431815cfbeefc349d6c83ef91bdbd1 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Tue, 22 Jul 2025 19:43:05 +0200 Subject: [PATCH] Make concurrency tests less flaky Don't try to enforce sequential order for jobs, given that the order is not even guaranteed by Solid Queue. Rename the test job to reflect that. Fix also another test where we waited for a job to finish but not for it to release the semaphore, so it happened most of the times but sometimes it didn't. --- ...b => non_overlapping_update_result_job.rb} | 2 +- test/dummy/app/jobs/update_result_job.rb | 1 + test/integration/concurrency_controls_test.rb | 56 ++++++++++--------- test/integration/instrumentation_test.rb | 8 +-- 4 files changed, 37 insertions(+), 30 deletions(-) rename test/dummy/app/jobs/{sequential_update_result_job.rb => non_overlapping_update_result_job.rb} (54%) diff --git a/test/dummy/app/jobs/sequential_update_result_job.rb b/test/dummy/app/jobs/non_overlapping_update_result_job.rb similarity index 54% rename from test/dummy/app/jobs/sequential_update_result_job.rb rename to test/dummy/app/jobs/non_overlapping_update_result_job.rb index a3afa33f..79ab98e5 100644 --- a/test/dummy/app/jobs/sequential_update_result_job.rb +++ b/test/dummy/app/jobs/non_overlapping_update_result_job.rb @@ -1,3 +1,3 @@ -class SequentialUpdateResultJob < UpdateResultJob +class NonOverlappingUpdateResultJob < UpdateResultJob limits_concurrency key: ->(job_result, **) { job_result } end diff --git a/test/dummy/app/jobs/update_result_job.rb b/test/dummy/app/jobs/update_result_job.rb index 04571eb6..fb016de0 100644 --- a/test/dummy/app/jobs/update_result_job.rb +++ b/test/dummy/app/jobs/update_result_job.rb @@ -1,5 +1,6 @@ class UpdateResultJob < ApplicationJob def perform(job_result, name:, pause: nil, exception: nil) + job_result.status += " + " unless job_result.status.blank? job_result.status += "s#{name}" sleep(pause) if pause diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index f0984078..178c796d 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -6,7 +6,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase self.use_transactional_tests = false setup do - @result = JobResult.create!(queue_name: "default", status: "seq: ") + @result = JobResult.create!(queue_name: "default", status: "") default_worker = { queues: "default", polling_interval: 0.1, processes: 3, threads: 2 } dispatcher = { polling_interval: 0.1, batch_size: 200, concurrency_maintenance_interval: 1 } @@ -20,13 +20,13 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase terminate_process(@pid) if process_exists?(@pid) end - test "run several conflicting jobs over the same record sequentially" do + test "run several conflicting jobs over the same record without overlapping" do ("A".."F").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds) end ("G".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -39,11 +39,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase UpdateResultJob.set(wait: 0.23.seconds).perform_later(@result, name: "000", pause: 0.1.seconds) ("A".."F").each_with_index do |name, i| - SequentialUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds) + NonOverlappingUpdateResultJob.set(wait: (0.2 + i * 0.01).seconds).perform_later(@result, name: name, pause: 0.3.seconds) end ("G".."K").each_with_index do |name, i| - SequentialUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name) + NonOverlappingUpdateResultJob.set(wait: (0.3 + i * 0.01).seconds).perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -85,11 +85,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "run several jobs over the same record sequentially, with some of them failing" do ("A".."F").each_with_index do |name, i| # A, C, E will fail, for i= 0, 2, 4 - SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) + NonOverlappingUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (ExpectedTestError if i.even?)) end ("G".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end wait_for_jobs_to_finish_for(5.seconds) @@ -100,7 +100,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase test "rely on dispatcher to unblock blocked executions with an available semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs - job = SequentialUpdateResultJob.perform_later(@result, name: "A") + job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs @@ -114,7 +114,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Now enqueue more jobs under that same key. They'll be all locked assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do ("B".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end end @@ -127,14 +127,12 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs - # We can't ensure the order between B and C, because it depends on which worker wins when - # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a end test "rely on dispatcher to unblock blocked executions with an expired semaphore" do # Simulate a scenario where we got an available semaphore and some stuck jobs - job = SequentialUpdateResultJob.perform_later(@result, name: "A") + job = NonOverlappingUpdateResultJob.perform_later(@result, name: "A") wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs @@ -147,7 +145,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase # Now enqueue more jobs under that same key. They'll be all locked assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do ("B".."K").each do |name| - SequentialUpdateResultJob.perform_later(@result, name: name) + NonOverlappingUpdateResultJob.perform_later(@result, name: name) end end @@ -159,13 +157,11 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(5.seconds) assert_no_unfinished_jobs - # We can't ensure the order between B and C, because it depends on which worker wins when - # unblocking, as one will try to unblock B and another C - assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a + assert_stored_sequence @result, ("A".."K").to_a end test "don't block claimed executions that get released" do - SequentialUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: "I'll be released to ready", pause: SolidQueue.shutdown_timeout + 10.seconds) job = SolidQueue::Job.last sleep(0.2) @@ -184,8 +180,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase skip if Rails::VERSION::MAJOR == 7 && Rails::VERSION::MINOR == 2 ActiveRecord::Base.transaction do - SequentialUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) - SequentialUpdateResultJob.perform_later(@result, name: "B") + NonOverlappingUpdateResultJob.perform_later(@result, name: "A", pause: 0.2.seconds) + NonOverlappingUpdateResultJob.perform_later(@result, name: "B") begin assert_equal 2, SolidQueue::Job.count @@ -219,7 +215,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end test "discard on conflict across different concurrency keys" do - another_result = JobResult.create!(queue_name: "default", status: "seq: ") + another_result = JobResult.create!(queue_name: "default", status: "") DiscardableUpdateResultJob.perform_later(@result, name: "1", pause: 0.2) DiscardableUpdateResultJob.perform_later(another_result, name: "2", pause: 0.2) DiscardableUpdateResultJob.perform_later(@result, name: "3") # Should be discarded @@ -239,6 +235,8 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase DiscardableUpdateResultJob.perform_later(@result, name: "2") wait_for_jobs_to_finish_for(5.seconds) + wait_for_semaphores_to_be_released_for(2.seconds) + assert_no_unfinished_jobs # Enqueue another job that shouldn't be discarded or blocked @@ -250,10 +248,18 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase end private - def assert_stored_sequence(result, *sequences) - expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}" }.join } + def assert_stored_sequence(result, sequence) + expected = sequence.sort.map { |name| "s#{name}c#{name}" }.join skip_active_record_query_cache do - assert_includes expected, result.reload.status + assert_equal expected, result.reload.status.split(" + ").sort.join + end + end + + def wait_for_semaphores_to_be_released_for(timeout) + wait_while_with_timeout(timeout) do + skip_active_record_query_cache do + SolidQueue::Semaphore.available.invert_where.any? + end end end end diff --git a/test/integration/instrumentation_test.rb b/test/integration/instrumentation_test.rb index 95dadb19..d6a039dd 100644 --- a/test/integration/instrumentation_test.rb +++ b/test/integration/instrumentation_test.rb @@ -234,7 +234,7 @@ class InstrumentationTest < ActiveSupport::TestCase 5.times { AddToBufferJob.perform_later("A") } # 1 ready + 3 blocked result = JobResult.create! - 4.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } events = subscribed("discard_all.solid_queue") do SolidQueue::ReadyExecution.discard_all_from_jobs(SolidQueue::Job.all) @@ -261,7 +261,7 @@ class InstrumentationTest < ActiveSupport::TestCase test "unblocking job emits release_blocked event" do result = JobResult.create! # 1 ready, 2 blocked - 3.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } # Simulate expiry of the concurrency locks travel_to 3.days.from_now @@ -283,11 +283,11 @@ class InstrumentationTest < ActiveSupport::TestCase test "unblocking jobs in bulk emits release_many_blocked event" do result = JobResult.create! # 1 ready, 3 blocked - 4.times { SequentialUpdateResultJob.perform_later(result, name: "A") } + 4.times { NonOverlappingUpdateResultJob.perform_later(result, name: "A") } # 1 ready, 2 blocked result = JobResult.create! - 3.times { SequentialUpdateResultJob.perform_later(result, name: "B") } + 3.times { NonOverlappingUpdateResultJob.perform_later(result, name: "B") } # Simulate expiry of the concurrency locks travel_to 3.days.from_now