forked from bpython/bpython
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_crashers.py
More file actions
118 lines (99 loc) · 3.58 KB
/
test_crashers.py
File metadata and controls
118 lines (99 loc) · 3.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import fcntl
import os
import pty
import struct
import sys
import termios
import textwrap
import unittest
try:
from unittest import skip
except ImportError:
def skip(f):
return lambda self: None
try:
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.protocol import ProcessProtocol
from twisted.trial.unittest import TestCase as TrialTestCase
except ImportError:
reactor = None
TEST_CONFIG = os.path.join(os.path.dirname(__file__), "test.config")
def set_win_size(fd, rows, columns):
s = struct.pack('HHHH', rows, columns, 0, 0)
fcntl.ioctl(fd, termios.TIOCSWINSZ, s)
class CrashersTest(object):
backend = "cli"
def run_bpython(self, input):
"""
Run bpython (with `backend` as backend) in a subprocess and
enter the given input. Uses a test config that disables the
paste detection.
Retuns bpython's output.
"""
result = Deferred()
class Protocol(ProcessProtocol):
STATES = (SEND_INPUT, COLLECT) = xrange(2)
def __init__(self):
self.data = ""
self.delayed_call = None
self.states = iter(self.STATES)
self.state = self.states.next()
def outReceived(self, data):
self.data += data
if self.delayed_call is not None:
self.delayed_call.cancel()
self.delayed_call = reactor.callLater(0.5, self.next)
def next(self):
self.delayed_call = None
if self.state == self.SEND_INPUT:
index = self.data.find(">>> ")
if index >= 0:
self.data = self.data[index + 4:]
self.transport.write(input)
self.state = self.states.next()
else:
self.transport.closeStdin()
if self.transport.pid is not None:
self.delayed_call = None
self.transport.signalProcess("TERM")
def processExited(self, reason):
if self.delayed_call is not None:
self.delayed_call.cancel()
result.callback(self.data)
(master, slave) = pty.openpty()
set_win_size(slave, 25, 80)
reactor.spawnProcess(Protocol(), sys.executable,
(sys.executable, "-m", "bpython." + self.backend,
"--config", TEST_CONFIG),
env=dict(TERM="vt100", LANG=os.environ.get("LANG", "")),
usePTY=(master, slave, os.ttyname(slave)))
return result
def test_issue108(self):
input = textwrap.dedent(
"""\
def spam():
u"y\\xe4y"
\b
spam(""")
deferred = self.run_bpython(input)
return deferred.addCallback(self.check_no_traceback)
def test_issue133(self):
input = textwrap.dedent(
"""\
def spam(a, (b, c)):
pass
\b
spam(1""")
return self.run_bpython(input).addCallback(self.check_no_traceback)
def check_no_traceback(self, data):
tb = data[data.find("Traceback"):]
self.assertTrue("Traceback" not in data, tb)
if reactor is not None:
class CursesCrashersTest(TrialTestCase, CrashersTest):
backend = "cli"
@skip("take 6 seconds, and Simon says we can skip them")
class UrwidCrashersTest(TrialTestCase, CrashersTest):
backend = "urwid"
if __name__ == "__main__":
unittest.main()