From 10dc6f6664af25520924c8cb2a5af0ee8cb16fe8 Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Wed, 2 Apr 2014 17:18:24 -0700 Subject: [PATCH 1/6] Modifying priority queue creation to place a job with a priority that is not found in Sidekiq::Priority.priorities in the default queue, rather than creating a new one. --- lib/sidekiq/priority.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidekiq/priority.rb b/lib/sidekiq/priority.rb index 0eaeec4..6d6093f 100644 --- a/lib/sidekiq/priority.rb +++ b/lib/sidekiq/priority.rb @@ -18,7 +18,7 @@ def self.priorities=(priorities) end def self.queue_with_priority(queue, priority) - priority.nil? ? queue : "#{queue}_#{priority}" + priority && self.priorities.indlude?(priority) ? "#{queue}_#{priority}" : queue end end end From 2c551196312fcb08feaa2070f0798b81ce8d5101 Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Thu, 3 Apr 2014 09:05:27 -0700 Subject: [PATCH 2/6] Fixed typo --- lib/sidekiq/priority.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidekiq/priority.rb b/lib/sidekiq/priority.rb index 6d6093f..c901adb 100644 --- a/lib/sidekiq/priority.rb +++ b/lib/sidekiq/priority.rb @@ -18,7 +18,7 @@ def self.priorities=(priorities) end def self.queue_with_priority(queue, priority) - priority && self.priorities.indlude?(priority) ? "#{queue}_#{priority}" : queue + priority && self.priorities.include?(priority) ? "#{queue}_#{priority}" : queue end end end From 000b697cb36f0fb658a8029f5757274c2126aef9 Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Thu, 3 Apr 2014 17:31:05 -0700 Subject: [PATCH 3/6] Added spec to test giving a nil priority and to test giving a priority that is not in Sidekiq::Priority.priorities --- spec/sidekiq/worker_ext_spec.rb | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/spec/sidekiq/worker_ext_spec.rb b/spec/sidekiq/worker_ext_spec.rb index cfe9895..f98e27c 100644 --- a/spec/sidekiq/worker_ext_spec.rb +++ b/spec/sidekiq/worker_ext_spec.rb @@ -22,5 +22,26 @@ def perform(first, second) 'queue' => 'foo_high' } end + + it 'sends an item to the default queue if priority is nil' do + TestWorker.stub(:client_push) { |item| item } + item = TestWorker.perform_with_priority(:invalid_priority, 1, 2) + item.should == { + 'class' => TestWorker, + 'args' => [1, 2], + 'queue' => :foo + } + end + + it 'sends an item to the default queue if a random priority is given' do + TestWorker.stub(:client_push) { |item| item } + item = TestWorker.perform_with_priority(:random_priority, 1, 2) + item.should == { + 'class' => TestWorker, + 'args' => [1, 2], + 'queue' => :foo + } + end + end end From 75c451d3078e34fa828afaa3949c71055ad2c862 Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Tue, 8 Apr 2014 16:03:06 -0700 Subject: [PATCH 4/6] Adding functionality to work with sidekiq pro's reliable queueing if it is installed --- lib/sidekiq/priority.rb | 13 ++- lib/sidekiq/priority/server/fetch.rb | 89 ++++++++++++++++--- sidekiq-priority.gemspec | 2 +- spec/sidekiq/priority/server/fetch_spec.rb | 68 +++++++------- .../priority/server/reliable_fetch_spec.rb | 80 +++++++++++++++++ 5 files changed, 204 insertions(+), 48 deletions(-) create mode 100644 spec/sidekiq/priority/server/reliable_fetch_spec.rb diff --git a/lib/sidekiq/priority.rb b/lib/sidekiq/priority.rb index c901adb..2a9f0ab 100644 --- a/lib/sidekiq/priority.rb +++ b/lib/sidekiq/priority.rb @@ -23,7 +23,14 @@ def self.queue_with_priority(queue, priority) end end -Sidekiq.configure_server do |config| +if defined? Rails + class ConfigureServer < Rails::Railtie + config.after_initialize do + require "#{File.dirname(File.absolute_path(__FILE__))}/priority/server/fetch.rb" + Sidekiq::Priority::Server.configure_priority_fetch + end + end +else require "#{directory}/priority/server/fetch.rb" - Sidekiq.options[:fetch] = Sidekiq::Priority::Server::Fetch -end + Sidekiq::Priority::Server.configure_priority_fetch +end \ No newline at end of file diff --git a/lib/sidekiq/priority/server/fetch.rb b/lib/sidekiq/priority/server/fetch.rb index d4f6476..b81a6a5 100644 --- a/lib/sidekiq/priority/server/fetch.rb +++ b/lib/sidekiq/priority/server/fetch.rb @@ -4,26 +4,87 @@ module Sidekiq module Priority module Server - class Fetch < Sidekiq::BasicFetch - def initialize(options) - queues = prioritized_queues(options[:queues]) - @strictly_ordered_queues = !!options[:strict] - @queues = queues.map { |q| "queue:#{q}" } - @unique_queues = @queues.uniq + + def self.configure_priority_fetch + Sidekiq.configure_server do |config| + Sidekiq.options[:fetch] = Sidekiq::Priority::Server.priority_fetch_class + end + end + + def self.priority_fetch_class + if defined? Sidekiq::Pro::ReliableFetch + reliable_priority_fetch_class + else + basic_priority_fetch_class + end + end + + def self.reliable_priority_fetch_class + return Sidekiq::Priority::Server::ReliableFetch if defined?(Sidekiq::Priority::Server::ReliableFetch) + + Sidekiq::Priority::Server.const_set('ReliableFetch', Class.new(Sidekiq::Pro::ReliableFetch) do + def initialize(options) + @queues = prioritized_queues(options[:queues]).map {|q| ["queue:#{q}", "queue:#{q}_#{Socket.gethostname}_#{options[:index]}"] } + @algo = (@queues.length == @queues.uniq.length ? Sidekiq::Pro::ReliableFetch::Strict : Sidekiq::Pro::ReliableFetch::Weighted) + @internal = Sidekiq.redis do |conn| + bulk_reply = conn.pipelined do + @queues.each do |(_, working_queue)| + conn.lrange(working_queue, 0, -1) + end + end + memo = [] + bulk_reply.each_with_index do |vals, i| + queue = @queues[i][0] + working_queue = @queues[i][1] + memo.unshift(*vals.map do |msg| + [queue, working_queue, msg] + end) + end + memo + end + Sidekiq.logger.warn("ReliableFetch: recovering work on #{@internal.size} messages") if @internal.size > 0 + end + + protected + + def prioritized_queues(base_queues) + queues = [] + priorities = Sidekiq::Priority.priorities + priorities.each do |priority| + base_queues.each do |queue| + queues << Sidekiq::Priority.queue_with_priority(queue, priority) + end + end + queues + end end + ) + end + + def self.basic_priority_fetch_class + Sidekiq::Priority::Server.const_set('BasicFetch', Class.new(Sidekiq::BasicFetch) do + + def initialize(options) + queues = prioritized_queues(options[:queues]) + @strictly_ordered_queues = !!options[:strict] + @queues = queues.map { |q| "queue:#{q}" } + @unique_queues = @queues.uniq + end - protected + protected - def prioritized_queues(base_queues) - queues = [] - priorities = Sidekiq::Priority.priorities - priorities.each do |priority| - base_queues.each do |queue| - queues << Sidekiq::Priority.queue_with_priority(queue, priority) + def prioritized_queues(base_queues) + queues = [] + priorities = Sidekiq::Priority.priorities + priorities.each do |priority| + base_queues.each do |queue| + queues << Sidekiq::Priority.queue_with_priority(queue, priority) + end end + queues end - queues end + ) end end end diff --git a/sidekiq-priority.gemspec b/sidekiq-priority.gemspec index b729ea5..8b5f92e 100644 --- a/sidekiq-priority.gemspec +++ b/sidekiq-priority.gemspec @@ -17,5 +17,5 @@ Gem::Specification.new do |s| s.add_development_dependency 'simplecov' s.add_development_dependency 'simplecov-rcov' - s.add_dependency 'sidekiq', '>= 2.1.0' + s.add_dependency 'sidekiq', '~> 2.17.0' end diff --git a/spec/sidekiq/priority/server/fetch_spec.rb b/spec/sidekiq/priority/server/fetch_spec.rb index 1593250..fde53b7 100644 --- a/spec/sidekiq/priority/server/fetch_spec.rb +++ b/spec/sidekiq/priority/server/fetch_spec.rb @@ -1,39 +1,47 @@ require 'spec_helper' require 'sidekiq/priority/server/fetch' -describe Sidekiq::Priority::Server::Fetch do +describe Sidekiq::Priority::Server do describe '#initialize' do - before :all do - options = { - queues: ['foo', 'bar', 'foo'], - strict: true - } - @fetch = Sidekiq::Priority::Server::Fetch.new(options) - end - it 'sets queues' do - @fetch.instance_variable_get(:@queues).should == [ - 'queue:foo_high', - 'queue:bar_high', - 'queue:foo_high', - 'queue:foo', - 'queue:bar', - 'queue:foo', - 'queue:foo_low', - 'queue:bar_low', - 'queue:foo_low' - ] - end + context 'when sidekiq pro is not installed' do + + before(:all) do + if defined? Sidekiq::Pro::ReliableFetch + Sidekiq::Pro.send(:remove_const, :ReliableFetch) + end + end + + it 'should use the basic priority fetch class' do + expect(Sidekiq::Priority::Server.priority_fetch_class).to eq(Sidekiq::Priority::Server::BasicFetch) + end + + let(:fetch) { Sidekiq::Priority::Server.basic_priority_fetch_class.new(queues: %w(foo bar foo), strict: true, index: 1) } + + it 'sets queues' do + expect(fetch.instance_variable_get(:@queues)).to eq([ + 'queue:foo_high', + 'queue:bar_high', + 'queue:foo_high', + 'queue:foo', + 'queue:bar', + 'queue:foo', + 'queue:foo_low', + 'queue:bar_low', + 'queue:foo_low' + ]) + end - it 'sets unique_queues' do - @fetch.instance_variable_get(:@unique_queues).should == [ - 'queue:foo_high', - 'queue:bar_high', - 'queue:foo', - 'queue:bar', - 'queue:foo_low', - 'queue:bar_low' - ] + it 'sets unique_queues' do + expect(fetch.instance_variable_get(:@unique_queues)).to eq([ + 'queue:foo_high', + 'queue:bar_high', + 'queue:foo', + 'queue:bar', + 'queue:foo_low', + 'queue:bar_low' + ]) + end end end end diff --git a/spec/sidekiq/priority/server/reliable_fetch_spec.rb b/spec/sidekiq/priority/server/reliable_fetch_spec.rb new file mode 100644 index 0000000..abe17f8 --- /dev/null +++ b/spec/sidekiq/priority/server/reliable_fetch_spec.rb @@ -0,0 +1,80 @@ +require 'spec_helper' +require 'sidekiq/priority/server/fetch' + +describe Sidekiq::Priority::Server do + describe '#initialize' do + context 'when sidekiq pro is installed' do + + before(:all) do + module Sidekiq + module Pro + class ReliableFetch + class Strict + end + class Weighted + end + end + end + end + end + + let(:fetch) { Sidekiq::Priority::Server.reliable_priority_fetch_class.new(queues: %w(foo bar), strict: true, index: 1)} + + it 'should use the reliable priority fetch class' do + expect(Sidekiq::Priority::Server.priority_fetch_class).to eq(Sidekiq::Priority::Server::ReliableFetch) + end + + it 'sets unique queues' do + require 'socket' + expected_list = [ + 'queue:foo_high', + 'queue:bar_high', + 'queue:foo', + 'queue:bar', + 'queue:foo_low', + 'queue:bar_low' + ].map{ |queue| [queue, "#{queue}_#{Socket.gethostname}_1"] } + + expect(fetch.instance_variable_get(:@queues)).to eq(expected_list) + end + + it 'sets the algorithm variable' do + expect(fetch.instance_variable_get(:@algo)).to eq(Sidekiq::Pro::ReliableFetch::Strict) + end + + it 'sets the internal variable' do + expect(fetch.instance_variable_get(:@internal)).to eq([]) + end + + context 'and redundant queues are specified' do + + let(:fetch) { Sidekiq::Priority::Server.reliable_priority_fetch_class.new(queues: %w(foo bar foo), strict: true, index: 1)} + + it 'sets queues' do + require 'socket' + expected_list = [ + 'queue:foo_high', + 'queue:bar_high', + 'queue:foo_high', + 'queue:foo', + 'queue:bar', + 'queue:foo', + 'queue:foo_low', + 'queue:bar_low', + 'queue:foo_low' + ].map{ |queue| [queue, "#{queue}_#{Socket.gethostname}_1"] } + + expect(fetch.instance_variable_get(:@queues)).to eq(expected_list) + end + + it 'sets the algorithm variable' do + expect(fetch.instance_variable_get(:@algo)).to eq(Sidekiq::Pro::ReliableFetch::Weighted) + end + + it 'sets the internal variable' do + expect(fetch.instance_variable_get(:@internal)).to eq([]) + end + end + end + end +end \ No newline at end of file From 8060dc88d1c929e45cbcfef38ff10438f59194be Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Tue, 8 Apr 2014 17:10:08 -0700 Subject: [PATCH 5/6] Refactored out a common function --- lib/sidekiq/priority/server/fetch.rb | 32 ++++++++++++++-------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/sidekiq/priority/server/fetch.rb b/lib/sidekiq/priority/server/fetch.rb index b81a6a5..67050c7 100644 --- a/lib/sidekiq/priority/server/fetch.rb +++ b/lib/sidekiq/priority/server/fetch.rb @@ -19,6 +19,18 @@ def self.priority_fetch_class end end + def self.prioritized_queues(base_queues) + queues = [] + priorities = Sidekiq::Priority.priorities + priorities.each do |priority| + base_queues.each do |queue| + queues << Sidekiq::Priority.queue_with_priority(queue, priority) + end + end + queues + end + + def self.reliable_priority_fetch_class return Sidekiq::Priority::Server::ReliableFetch if defined?(Sidekiq::Priority::Server::ReliableFetch) @@ -48,20 +60,15 @@ def initialize(options) protected def prioritized_queues(base_queues) - queues = [] - priorities = Sidekiq::Priority.priorities - priorities.each do |priority| - base_queues.each do |queue| - queues << Sidekiq::Priority.queue_with_priority(queue, priority) - end - end - queues + Sidekiq::Priority::Server.prioritized_queues(base_queues) end end ) end def self.basic_priority_fetch_class + return Sidekiq::Priority::Server::BasicFetch if defined?(Sidekiq::Priority::Server::BasicFetch) + Sidekiq::Priority::Server.const_set('BasicFetch', Class.new(Sidekiq::BasicFetch) do def initialize(options) @@ -74,14 +81,7 @@ def initialize(options) protected def prioritized_queues(base_queues) - queues = [] - priorities = Sidekiq::Priority.priorities - priorities.each do |priority| - base_queues.each do |queue| - queues << Sidekiq::Priority.queue_with_priority(queue, priority) - end - end - queues + Sidekiq::Priority::Server.prioritized_queues(base_queues) end end ) From 6b5091bbf2d24422a4a785611f3b27fe3ff723e8 Mon Sep 17 00:00:00 2001 From: Daniel Alkalai Date: Tue, 15 Apr 2014 19:19:42 -0700 Subject: [PATCH 6/6] Modifyiing travis.yml file to include redis-server --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 17a61fa..b06445e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,3 +2,5 @@ language: ruby rvm: - 2.0.0 - 1.9.3 +services: + - redis-server \ No newline at end of file