Skip to content

Commit 61d4123

Browse files
committed
Async version of the awesome runner
1 parent 71401dd commit 61d4123

File tree

2 files changed

+66
-3
lines changed

2 files changed

+66
-3
lines changed

lib/floe/awesome_runner.rb

Lines changed: 64 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,59 @@
11
# frozen_string_literal: true
22

3+
require "concurrent/array"
4+
35
module Floe
6+
class AwesomeProcess < Thread
7+
attr_reader :result
8+
attr_accessor :error
9+
10+
def initialize(queue, context, *args)
11+
self.report_on_exception = true
12+
@processed = false
13+
@context = context
14+
15+
# Don't like changing the value of context here,
16+
# but want to make sure thread is set before the `queue.push`
17+
# `queue.pop` will look potentially at status, which is through thread
18+
context["thread"] = self
19+
20+
super do
21+
@result = AwesomeSpawn.run(*args)
22+
23+
# this is changing the value of the context
24+
# in the non-main thread
25+
# Potential race condition here
26+
Floe::AwesomeRunner.populate_results!(@context, :result => @result)
27+
28+
# trigger an event
29+
queue.push(["delete", context])
30+
rescue => err
31+
# Shouldn't ever get in here
32+
@error = err
33+
34+
Floe::AwesomeRunner.populate_results!(@context, :error => err)
35+
36+
# trigger an event
37+
queue.push(["delete", context])
38+
end
39+
end
40+
end
41+
442
class AwesomeRunner < Floe::Runner
543
SCHEME = "awesome"
644
SCHEME_PREFIX = "#{SCHEME}://"
745
SCHEME_OFFSET = SCHEME.length + 3
846

47+
# only exposed for tests
48+
# use wait instead
49+
attr_reader :queue
50+
951
def initialize(_options = {})
1052
require "awesome_spawn"
1153

54+
# events triggered
55+
@queue = Queue.new
56+
1257
super
1358
end
1459

@@ -21,13 +66,18 @@ def run_async!(resource, params = {}, _secrets = {}, _context = {})
2166

2267
runner_context = {}
2368

24-
# TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1)
25-
result = AwesomeSpawn.run(method, :env => params, :params => args)
26-
self.class.populate_results!(runner_context, :result => result)
69+
# NOTE: this adds itself to the runner_context
70+
AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args)
71+
2772
runner_context
2873
end
2974

3075
def status!(runner_context)
76+
# check if it has no output (i.e.: we think it is running) but it is not running
77+
if !runner_context.key?("Output") && !runner_context["thread"]&.alive?
78+
runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"}
79+
runner_context["Error"] = true
80+
end
3181
end
3282

3383
def running?(runner_context)
@@ -43,6 +93,17 @@ def output(runner_context)
4393
end
4494

4595
def cleanup(runner_context)
96+
runner_context["thread"] = nil
97+
end
98+
99+
def wait(timeout: nil, _events: %i[create update delete])
100+
# TODO: implement whole interface
101+
raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given?
102+
103+
loop do
104+
event_context = @queue.pop
105+
yield event_context if block_given?
106+
end
46107
end
47108

48109
# internal methods

spec/awesome_runner_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n")
2121

2222
subject.run_async!("awesome://ls")
23+
subject.queue.pop
2324
end
2425

2526
it "passes environment variables to command run" do
2627
stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n")
2728

2829
subject.run_async!("awesome://ls", {"FOO" => "BAR"})
30+
subject.queue.pop
2931
end
3032
end
3133

0 commit comments

Comments
 (0)