Skip to content

Commit 25645ff

Browse files
committed
Use pyte to do integration tests
1 parent bca8474 commit 25645ff

File tree

3 files changed

+178
-20
lines changed

3 files changed

+178
-20
lines changed

awsshell/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ def create_application(self, completer, history,
432432
editing_mode = EditingMode.EMACS
433433

434434
return Application(
435+
use_alternate_screen=self._output is not None,
435436
editing_mode=editing_mode,
436437
layout=self.create_layout(display_completions_in_columns, toolbar),
437438
mouse_support=False,

requirements-test.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,4 @@ configobj==5.0.6
66
# Note you need at least pip --version of 6.0 or
77
# higher to be able to pick on these version specifiers.
88
unittest2==1.1.0; python_version == '2.6'
9-
# requires tmux < v2.0
10-
hecate==0.1.0
9+
pyte==0.6.0

tests/integration/test_ui.py

Lines changed: 176 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,181 @@
1-
from tests.compat import unittest
2-
from time import sleep
3-
from hecate.hecate import Runner, Timeout
1+
import pyte
2+
import time
3+
import unittest
4+
import threading
5+
from awsshell import shellcomplete, autocomplete
6+
from awsshell.app import AWSShell
7+
from awsshell.docs import DocRetriever
8+
from prompt_toolkit.keys import Keys
9+
from prompt_toolkit.input import PipeInput
10+
from prompt_toolkit.layout.screen import Size
11+
from prompt_toolkit.terminal.vt100_output import Vt100_Output
12+
from prompt_toolkit.terminal.vt100_input import ANSI_SEQUENCES
413

5-
class UITest(unittest.TestCase):
14+
_polly_index = {
15+
"children": {
16+
"describe-voices": {
17+
"children": {},
18+
"argument_metadata": {
19+
"--language-code": {
20+
"type_name": "string",
21+
"example": "",
22+
"api_name": "LanguageCode",
23+
"required": False,
24+
"minidoc": "LanguageCode minidoc string"
25+
}
26+
},
27+
"arguments": ["--language-code"],
28+
"commands": []
29+
}
30+
},
31+
"argument_metadata": {},
32+
"arguments": [],
33+
"commands": ["describe-voices"]
34+
}
35+
36+
_index_data = {
37+
'aws': {
38+
'commands': ['polly'],
39+
'children': {'polly': _polly_index},
40+
'arguments': [],
41+
'argument_metadata': {},
42+
}
43+
}
44+
45+
_doc_db = {b'aws.polly': 'Polly is a service'}
46+
47+
48+
class PyteOutput(object):
49+
50+
def __init__(self, columns=80, lines=24):
51+
self._columns = columns
52+
self._lines = lines
53+
self._screen = pyte.Screen(self._columns, self._lines)
54+
self._stream = pyte.ByteStream(self._screen)
55+
self.encoding = 'utf-8'
56+
57+
def write(self, data):
58+
self._stream.feed(data)
59+
60+
def flush(self):
61+
pass
62+
63+
def get_size(self):
64+
def _get_size():
65+
return Size(columns=self._columns, rows=self._lines)
66+
return _get_size
67+
68+
def display(self):
69+
return self._screen.display
70+
71+
72+
def _create_shell(ptk_input, ptk_output):
73+
io = {
74+
'input': ptk_input,
75+
'output': ptk_output,
76+
}
77+
doc_data = DocRetriever(_doc_db)
78+
model_completer = autocomplete.AWSCLIModelCompleter(_index_data)
79+
completer = shellcomplete.AWSShellCompleter(model_completer)
80+
return AWSShell(completer, model_completer, doc_data, **io)
81+
82+
83+
_ansi_sequence = dict((key, ansi) for ansi, key in ANSI_SEQUENCES.items())
84+
85+
86+
class VirtualShell(object):
87+
88+
def __init__(self, shell=None):
89+
self._ptk_input = PipeInput()
90+
self._pyte = PyteOutput()
91+
self._ptk_output = Vt100_Output(self._pyte, self._pyte.get_size())
92+
93+
if shell is None:
94+
shell = _create_shell(self._ptk_input, self._ptk_output)
95+
96+
def _run_shell():
97+
shell.run()
98+
99+
self._thread = threading.Thread(target=_run_shell)
100+
self._thread.start()
101+
102+
def write(self, data):
103+
self._ptk_input.send_text(data)
104+
105+
def press_keys(self, *keys):
106+
sequences = [_ansi_sequence[key] for key in keys]
107+
self.write(''.join(sequences))
108+
109+
def display(self):
110+
return self._pyte.display()
111+
112+
def quit(self):
113+
# figure out a better way to quit
114+
self.press_keys(Keys.F10)
115+
self._thread.join()
116+
self._ptk_input.close()
117+
118+
def __enter__(self):
119+
return self
120+
121+
def __exit__(self, exc_type, exc_val, exc_tb):
122+
self.quit()
123+
124+
125+
class ShellTest(unittest.TestCase):
6126

7127
def setUp(self):
8-
self.shell = Runner(u'aws-shell')
128+
self.shell = VirtualShell()
9129

10130
def tearDown(self):
11-
self.shell.shutdown()
12-
13-
def test_docs_are_searchable(self):
14-
shell = self.shell
15-
shell.await_text(u'aws>', timeout=2)
16-
shell.write(u'ec2 ')
17-
shell.await_text(u'Amazon EC2')
18-
shell.press(u'F9')
19-
shell.write(u'/')
20-
# wait for the input timeout to trigger
21-
sleep(1)
22-
# ensure documentation panel is still there
23-
shell.await_text(u'Amazon EC2')
131+
self.shell.quit()
132+
133+
def write(self, data):
134+
self.shell.write(data)
135+
136+
def press_keys(self, *keys):
137+
self.shell.press_keys(*keys)
138+
139+
def _poll_display(self, timeout=2, interval=0.1):
140+
start = time.time()
141+
while time.time() <= start + timeout:
142+
display = self.shell.display()
143+
yield display
144+
time.sleep(interval)
145+
146+
def await_text(self, text):
147+
for display in self._poll_display():
148+
for line in display:
149+
if text in line:
150+
return
151+
display = '\n'.join(display)
152+
fail_message = '"{text}" not found on screen: \n{display}'
153+
self.fail(fail_message.format(text=text, display=display))
154+
155+
def test_toolbar_appears(self):
156+
self.await_text('[F3] Keys')
157+
158+
def test_input_works(self):
159+
self.write('ec2')
160+
self.await_text('ec2')
161+
162+
def test_completion_menu_operation(self):
163+
self.write('polly desc')
164+
self.await_text('describe-voices')
165+
166+
def test_completion_menu_argument(self):
167+
self.write('polly describe-voices --l')
168+
self.await_text('--language-code')
169+
170+
def test_doc_menu_appears(self):
171+
self.write('polly ')
172+
self.await_text('Polly is a service')
173+
174+
def test_doc_menu_is_searchable(self):
175+
self.write('polly ')
176+
self.await_text('Polly is a service')
177+
self.press_keys(Keys.F9)
178+
self.write('/')
179+
# wait for the input timeout
180+
time.sleep(0.6)
181+
self.await_text('Polly is a service')

0 commit comments

Comments
 (0)