Skip to content

Commit ff4c8d3

Browse files
committed
Use pyte to do integration tests
1 parent bca8474 commit ff4c8d3

File tree

3 files changed

+187
-0
lines changed

3 files changed

+187
-0
lines changed

awsshell/app.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,7 @@ def create_application(self, completer, history,
432432
editing_mode = EditingMode.EMACS
433433

434434
return Application(
435+
use_alternate_screen=self._output is not None,
435436
editing_mode=editing_mode,
436437
layout=self.create_layout(display_completions_in_columns, toolbar),
437438
mouse_support=False,

requirements-test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ configobj==5.0.6
88
unittest2==1.1.0; python_version == '2.6'
99
# requires tmux < v2.0
1010
hecate==0.1.0
11+
pyte==0.6.0

tests/integration/test_ui_pyte.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import pyte
2+
import time
3+
import unittest
4+
import threading
5+
from awsshell import shellcomplete, autocomplete
6+
from awsshell.app import AWSShell
7+
from awsshell.docs import DocRetriever
8+
from prompt_toolkit.keys import Keys
9+
from prompt_toolkit.input import PipeInput
10+
from prompt_toolkit.layout.screen import Size
11+
from prompt_toolkit.terminal.vt100_output import Vt100_Output
12+
from prompt_toolkit.terminal.vt100_input import ANSI_SEQUENCES
13+
14+
_polly_index = {
15+
"children": {
16+
"describe-voices": {
17+
"children": {},
18+
"argument_metadata": {
19+
"--language-code": {
20+
"type_name": "string",
21+
"example": "",
22+
"api_name": "LanguageCode",
23+
"required": False,
24+
"minidoc": "LanguageCode minidoc string"
25+
}
26+
},
27+
"arguments": ["--language-code"],
28+
"commands": []
29+
}
30+
},
31+
"argument_metadata": {},
32+
"arguments": [],
33+
"commands": ["describe-voices"]
34+
}
35+
36+
_index_data = {
37+
'aws': {
38+
'commands': ['polly'],
39+
'children': {'polly': _polly_index},
40+
'arguments': [],
41+
'argument_metadata': {},
42+
}
43+
}
44+
45+
_doc_db = {b'aws.polly': 'Polly is a service'}
46+
47+
48+
class PyteOutput(object):
49+
50+
def __init__(self, columns=80, lines=24):
51+
self._columns = columns
52+
self._lines = lines
53+
self._screen = pyte.Screen(self._columns, self._lines)
54+
self._stream = pyte.ByteStream(self._screen)
55+
self.encoding = 'utf-8'
56+
57+
def write(self, data):
58+
self._stream.feed(data)
59+
60+
def flush(self):
61+
pass
62+
63+
def get_size(self):
64+
def _get_size():
65+
return Size(columns=self._columns, rows=self._lines)
66+
return _get_size
67+
68+
def display(self):
69+
return self._screen.display
70+
71+
72+
def _create_shell(ptk_input, ptk_output):
73+
io = {
74+
'input': ptk_input,
75+
'output': ptk_output,
76+
}
77+
doc_data = DocRetriever(_doc_db)
78+
model_completer = autocomplete.AWSCLIModelCompleter(_index_data)
79+
completer = shellcomplete.AWSShellCompleter(model_completer)
80+
return AWSShell(completer, model_completer, doc_data, **io)
81+
82+
83+
_ansi_sequence = dict((key, ansi) for ansi, key in ANSI_SEQUENCES.items())
84+
85+
86+
def _prompt_toolkit_key_to_ansi(key):
87+
return _ansi_sequence.get(key, '')
88+
89+
90+
class VirtualShell(object):
91+
92+
def __init__(self, shell=None):
93+
self._ptk_input = PipeInput()
94+
self._pyte = PyteOutput()
95+
self._ptk_output = Vt100_Output(self._pyte, self._pyte.get_size())
96+
97+
if shell is None:
98+
shell = _create_shell(self._ptk_input, self._ptk_output)
99+
100+
def _run_shell():
101+
shell.run()
102+
103+
self._thread = threading.Thread(target=_run_shell)
104+
self._thread.start()
105+
106+
def write(self, data):
107+
self._ptk_input.send_text(data)
108+
109+
def press_keys(self, *keys):
110+
sequences = [_prompt_toolkit_key_to_ansi(key) for key in keys]
111+
self.write(''.join(sequences))
112+
113+
def display(self):
114+
return self._pyte.display()
115+
116+
def quit(self):
117+
# figure out a better way to quit
118+
self.press_keys(Keys.F10)
119+
self._thread.join()
120+
self._ptk_input.close()
121+
122+
def __enter__(self):
123+
return self
124+
125+
def __exit__(self, exc_type, exc_val, exc_tb):
126+
self.quit()
127+
128+
129+
class ShellTest(unittest.TestCase):
130+
131+
def setUp(self):
132+
self.shell = VirtualShell()
133+
134+
def tearDown(self):
135+
self.shell.quit()
136+
137+
def write(self, data):
138+
self.shell.write(data)
139+
140+
def press_keys(self, *keys):
141+
self.shell.press_keys(*keys)
142+
143+
def _poll_display(self, timeout=2, interval=0.1):
144+
start = time.time()
145+
while time.time() <= start + timeout:
146+
display = self.shell.display()
147+
yield display
148+
time.sleep(interval)
149+
150+
def await_text(self, text):
151+
for display in self._poll_display():
152+
for line in display:
153+
if text in line:
154+
return
155+
display = '\n'.join(display)
156+
fail_message = '"{text}" not found on screen: \n{display}'
157+
self.fail(fail_message.format(text=text, display=display))
158+
159+
def test_toolbar_appears(self):
160+
self.await_text('[F3] Keys')
161+
162+
def test_input_works(self):
163+
self.write('ec2')
164+
self.await_text('ec2')
165+
166+
def test_completion_menu_operation(self):
167+
self.write('polly desc')
168+
self.await_text('describe-voices')
169+
170+
def test_completion_menu_argument(self):
171+
self.write('polly describe-voices --l')
172+
self.await_text('--language-code')
173+
174+
def test_doc_menu_appears(self):
175+
self.write('polly ')
176+
self.await_text('Polly is a service')
177+
178+
def test_doc_menu_is_searchable(self):
179+
self.write('polly ')
180+
self.await_text('Polly is a service')
181+
self.press_keys(Keys.F9)
182+
self.write('/')
183+
# wait for the input timeout
184+
time.sleep(0.6)
185+
self.await_text('Polly is a service')

0 commit comments

Comments
 (0)