Skip to content

Commit 60e2f13

Browse files
committed
generic runners
1 parent 313b6d9 commit 60e2f13

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

lib/floe/runner.rb

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ def for_resource(resource)
2727
scheme = resource.split("://").first
2828
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
2929
end
30+
31+
def runners
32+
@runners.each_value.map do |runner|
33+
runner = runner.call if runner.kind_of?(Proc)
34+
runner
35+
end
36+
end
3037
end
3138

3239
# Run a command asynchronously and create a runner_context
@@ -75,8 +82,13 @@ def cleanup(_runner_context)
7582
raise NotImplementedError, "Must be implemented in a subclass"
7683
end
7784

78-
def wait(timeout: nil, events: %i[create update delete])
79-
raise NotImplementedError, "Must be implemented in a subclass"
80-
end
85+
# Optional Watcher for events that is run in another thread.
86+
#
87+
# @yield [event, runner_context]
88+
# @yieldparam [Symbol] event values: :create :update :delete :unknown
89+
# @yieldparam [Hash] runner_context context provided by runner
90+
# def wait(timeout: nil, events: %i[create update delete])
91+
# raise NotImplementedError, "Must be implemented in a subclass"
92+
# end
8193
end
8294
end

lib/floe/workflow.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
2020
workflows = [workflows] if workflows.kind_of?(self)
2121
logger.info("checking #{workflows.count} workflows...")
2222

23-
run_until = Time.now.utc + timeout if timeout.to_i > 0
24-
ready = []
25-
queue = Queue.new
26-
wait_thread = Thread.new do
27-
loop do
28-
Runner.for_resource("docker").wait do |event, runner_context|
29-
queue.push([event, runner_context])
23+
run_until = Time.now.utc + timeout if timeout.to_i > 0
24+
ready = []
25+
queue = Queue.new
26+
wait_threads =
27+
Runner.runners.map do |runner|
28+
next unless runner.respond_to?(:wait)
29+
30+
Thread.new do
31+
loop do
32+
runner.wait do |event, runner_context|
33+
queue.push([event, runner_context])
34+
end
35+
end
3036
end
3137
end
32-
end
3338

3439
loop do
3540
ready = workflows.select(&:step_nonblock_ready?)
@@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
8186
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
8287
ready
8388
ensure
84-
wait_thread&.kill
89+
wait_threads.compact.map(&:kill)
8590
end
8691
end
8792

0 commit comments

Comments
 (0)