Skip to content

Commit 131c562

Browse files
committed
generic runners
1 parent 46ed456 commit 131c562

File tree

3 files changed

+29
-12
lines changed

3 files changed

+29
-12
lines changed

lib/floe/runner.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ def for_resource(resource)
2727
scheme = resource.split("://").first
2828
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
2929
end
30+
31+
def runners
32+
@runners.each_value.map do |runner|
33+
runner = runner.call if runner.kind_of?(Proc)
34+
runner
35+
end
36+
end
3037
end
3138

3239
# Run a command asynchronously and create a runner_context
@@ -75,8 +82,9 @@ def cleanup(_runner_context)
7582
raise NotImplementedError, "Must be implemented in a subclass"
7683
end
7784

78-
def wait(timeout: nil, events: %i[create update delete])
79-
raise NotImplementedError, "Must be implemented in a subclass"
80-
end
85+
# running in another thread, this watches for async events
86+
# def wait(timeout: nil, events: %i[create update delete])
87+
# raise NotImplementedError, "Must be implemented in a subclass"
88+
# end
8189
end
8290
end

lib/floe/workflow.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
2020
workflows = [workflows] if workflows.kind_of?(self)
2121
logger.info("checking #{workflows.count} workflows...")
2222

23-
run_until = Time.now.utc + timeout if timeout.to_i > 0
24-
ready = []
25-
queue = Queue.new
26-
wait_thread = Thread.new do
27-
loop do
28-
Runner.for_resource("docker").wait do |event, runner_context|
29-
queue.push([event, runner_context])
23+
run_until = Time.now.utc + timeout if timeout.to_i > 0
24+
ready = []
25+
queue = Queue.new
26+
wait_threads =
27+
Runner.runners.map do |runner|
28+
next unless runner.respond_to?(:wait)
29+
30+
Thread.new do
31+
loop do
32+
runner.wait do |event, runner_context|
33+
queue.push([event, runner_context])
34+
end
35+
end
3036
end
3137
end
32-
end
3338

3439
loop do
3540
ready = workflows.select(&:step_nonblock_ready?)
@@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
8186
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
8287
ready
8388
ensure
84-
wait_thread&.kill
89+
wait_threads.compact.map(&:kill)
8590
end
8691
end
8792

lib/floe/workflow/context.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ def execution
2424
@context["Execution"]
2525
end
2626

27+
def object
28+
[execution["_object_type"], execution["_object_id"]&.id]
29+
end
30+
2731
def started?
2832
execution.key?("StartTime")
2933
end

0 commit comments

Comments
 (0)