Skip to content

Commit 0f6f683

Browse files
committed
Implement runner#wait in a generic way
1 parent a809ce2 commit 0f6f683

File tree

2 files changed

+26
-13
lines changed

2 files changed

+26
-13
lines changed

lib/floe/runner.rb

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ def register_scheme(scheme, klass_or_proc)
1717

1818
private def resolve_scheme(scheme)
1919
runner = @runners[scheme]
20-
runner = @runners[scheme] = @runners[scheme].call if runner.is_a?(Proc)
20+
runner = @runners[scheme] = @runners[scheme].call if runner.kind_of?(Proc)
2121
runner
2222
end
2323

@@ -27,6 +27,13 @@ def for_resource(resource)
2727
scheme = resource.split("://").first
2828
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
2929
end
30+
31+
def runners
32+
@runners.each_value.map do |runner|
33+
runner = runner.call if runner.kind_of?(Proc)
34+
runner
35+
end
36+
end
3037
end
3138

3239
# Run a command asynchronously and create a runner_context
@@ -75,8 +82,9 @@ def cleanup(_runner_context)
7582
raise NotImplementedError, "Must be implemented in a subclass"
7683
end
7784

78-
def wait(timeout: nil, events: %i[create update delete])
79-
raise NotImplementedError, "Must be implemented in a subclass"
80-
end
85+
# Implement if the runner needs a watch for async events
86+
# def wait(timeout: nil, events: %i[create update delete])
87+
# raise NotImplementedError, "Must be implemented in a subclass"
88+
# end
8189
end
8290
end

lib/floe/workflow.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
2020
workflows = [workflows] if workflows.kind_of?(self)
2121
logger.info("checking #{workflows.count} workflows...")
2222

23-
run_until = Time.now.utc + timeout if timeout.to_i > 0
24-
ready = []
25-
queue = Queue.new
26-
wait_thread = Thread.new do
27-
loop do
28-
Runner.for_resource("docker").wait do |event, runner_context|
29-
queue.push([event, runner_context])
23+
run_until = Time.now.utc + timeout if timeout.to_i > 0
24+
ready = []
25+
queue = Queue.new
26+
wait_threads =
27+
Runner.runners.map do |runner|
28+
next unless runner.respond_to?(:wait)
29+
30+
Thread.new do
31+
loop do
32+
runner.wait do |event, runner_context|
33+
queue.push([event, runner_context])
34+
end
35+
end
3036
end
3137
end
32-
end
3338

3439
loop do
3540
ready = workflows.select(&:step_nonblock_ready?)
@@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
8186
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
8287
ready
8388
ensure
84-
wait_thread&.kill
89+
wait_threads.compact.map(&:kill)
8590
end
8691
end
8792

0 commit comments

Comments
 (0)