12
12
# See the License for the specific language governing permissions and
13
13
# limitations under the License.
14
14
15
+ import io
15
16
import os
16
17
import time
17
18
import unittest
18
19
19
20
from launch import LaunchDescription
20
21
from launch import LaunchService
21
- from launch.actions import EmitEvent
22
- from launch.actions import ExecuteProcess
23
- from launch.actions import RegisterEventHandler
24
- from launch.event_handlers import OnProcessIO
25
- from launch.events import Shutdown
22
+ import launch.actions
23
+ import launch.event_handlers
24
+ import launch.events
26
25
27
26
28
27
# TODO(clalancette): we can't currently use launch_testing since there is
@@ -37,6 +36,10 @@ class TestExecutablesDemo(unittest.TestCase):
37
36
self._ls.include_launch_description(self.generate_launch_description())
38
37
self._saw_cam2image_output = False
39
38
self._saw_showimage_output = False
39
+ # Windows can emit partial lines, so buffer the data and only check
40
+ # check when we have a complete line
41
+ self._cam2image_buffer = io.StringIO()
42
+ self._showimage_buffer = io.StringIO()
40
43
41
44
def generate_launch_description(self):
42
45
launch_description = LaunchDescription()
@@ -58,7 +61,7 @@ class TestExecutablesDemo(unittest.TestCase):
58
61
59
62
command = [showimage_executable]
60
63
command.extend(subscriber_executable_args)
61
- showimage_process = ExecuteProcess(
64
+ showimage_process = launch.actions. ExecuteProcess(
62
65
cmd=command,
63
66
name=showimage_name,
64
67
env=env,
@@ -73,7 +76,7 @@ class TestExecutablesDemo(unittest.TestCase):
73
76
74
77
command = [cam2image_executable]
75
78
command.extend(publisher_executable_args)
76
- cam2image_process = ExecuteProcess(
79
+ cam2image_process = launch.actions. ExecuteProcess(
77
80
cmd=command,
78
81
name=cam2image_name,
79
82
env=env,
@@ -82,31 +85,53 @@ class TestExecutablesDemo(unittest.TestCase):
82
85
launch_description.add_action(cam2image_process)
83
86
84
87
launch_description.add_action(
85
- RegisterEventHandler(
86
- OnProcessIO(
88
+ launch.actions. RegisterEventHandler(
89
+ launch.event_handlers. OnProcessIO(
87
90
on_stdout=self.append,
88
91
on_stderr=self.append,
89
92
)
90
93
)
91
94
)
92
95
96
+ launch_description.add_action(
97
+ launch.actions.TimerAction(
98
+ period=10.0,
99
+ actions=[launch.actions.Shutdown(reason='Timer expired')])
100
+ )
101
+
93
102
return launch_description
94
103
104
+ def append_and_check(self, process_io, process_name, text_to_check):
105
+ if process_name in process_io.process_name:
106
+ buffer = getattr(self, '_' + process_name + '_buffer')
107
+ buffer.write(process_io.text.decode(errors='replace'))
108
+ buffer.seek(0)
109
+ last_line = None
110
+ for line in buffer:
111
+ # Note that this does not use os.linesep; apparently rclpy
112
+ # node.get_logger().info doesn't use the OS line separator
113
+ if line.endswith('\n'):
114
+ # We have a complete line, see if it has what we want
115
+ if text_to_check in line[:-len(os.linesep)]:
116
+ setattr(self, '_saw_' + process_name + '_output', True)
117
+ break
118
+ else:
119
+ last_line = line
120
+ break
121
+ buffer.seek(0)
122
+ buffer.truncate(0)
123
+ if last_line is not None:
124
+ buffer.write(last_line)
125
+
95
126
def append(self, process_io):
96
- if 'cam2image' in process_io.process_name:
97
- if 'Publishing image #' in process_io.text.decode():
98
- self._saw_cam2image_output = True
99
- elif 'showimage' in process_io.process_name:
100
- if 'Received image #' in process_io.text.decode():
101
- self._saw_showimage_output = True
127
+ self.append_and_check(process_io, 'cam2image', 'Publishing image #')
128
+ self.append_and_check(process_io, 'showimage', 'Received image #')
102
129
103
130
if self._saw_cam2image_output and self._saw_showimage_output:
104
131
# We've seen all required arguments from the test, quit
105
- return EmitEvent(event=Shutdown(reason='finished', due_to_sigint=False))
106
-
107
- if time.time() - self._start_time > 10.0:
108
- # We've waited more than 10 seconds with no output; raise an error
109
- return EmitEvent(event=Shutdown(reason='timeout', due_to_sigint=False))
132
+ return launch.actions.EmitEvent(
133
+ event=launch.events.Shutdown(reason='finished', due_to_sigint=False)
134
+ )
110
135
111
136
def test_reliable_qos(self):
112
137
self._ls.run()
0 commit comments