OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2007 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 import StringIO | |
5 import sys | |
6 | |
7 # Twisted Imports | |
8 from twisted.trial import unittest | |
9 from twisted.spread import banana | |
10 from twisted.python import failure | |
11 from twisted.internet import protocol, main | |
12 | |
13 | |
14 class MathTestCase(unittest.TestCase): | |
15 def testInt2b128(self): | |
16 funkylist = range(0,100) + range(1000,1100) + range(1000000,1000100) + [
1024 **10l] | |
17 for i in funkylist: | |
18 x = StringIO.StringIO() | |
19 banana.int2b128(i, x.write) | |
20 v = x.getvalue() | |
21 y = banana.b1282int(v) | |
22 assert y == i, "y = %s; i = %s" % (y,i) | |
23 | |
24 class BananaTestCase(unittest.TestCase): | |
25 | |
26 encClass = banana.Banana | |
27 | |
28 def setUp(self): | |
29 self.io = StringIO.StringIO() | |
30 self.enc = self.encClass() | |
31 self.enc.makeConnection(protocol.FileWrapper(self.io)) | |
32 self.enc._selectDialect("none") | |
33 self.enc.expressionReceived = self.putResult | |
34 | |
35 def putResult(self, result): | |
36 self.result = result | |
37 | |
38 def tearDown(self): | |
39 self.enc.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
40 del self.enc | |
41 | |
42 def testString(self): | |
43 self.enc.sendEncoded("hello") | |
44 l = [] | |
45 self.enc.dataReceived(self.io.getvalue()) | |
46 assert self.result == 'hello' | |
47 | |
48 def testLong(self): | |
49 self.enc.sendEncoded(1015l) | |
50 self.enc.dataReceived(self.io.getvalue()) | |
51 assert self.result == 1015l, "should be 1015l, got %s" % self.result | |
52 | |
53 | |
54 def test_largeLong(self): | |
55 """ | |
56 Test that various longs greater than 2 ** 32 - 1 round-trip through | |
57 banana properly. | |
58 """ | |
59 for exp in (32, 64, 128, 256): | |
60 for add in (0, 1): | |
61 n = 2 ** exp + add | |
62 self.io.truncate(0) | |
63 self.enc.sendEncoded(n) | |
64 self.enc.dataReceived(self.io.getvalue()) | |
65 self.assertEqual(self.result, n) | |
66 | |
67 | |
68 def _getSmallest(self): | |
69 # How many bytes of prefix our implementation allows | |
70 bytes = self.enc.prefixLimit | |
71 # How many useful bits we can extract from that based on Banana's | |
72 # base-128 representation. | |
73 bits = bytes * 7 | |
74 # The largest number we _should_ be able to encode | |
75 largest = 2 ** bits - 1 | |
76 # The smallest number we _shouldn't_ be able to encode | |
77 smallest = largest + 1 | |
78 return smallest | |
79 | |
80 | |
81 def test_encodeTooLargeLong(self): | |
82 """ | |
83 Test that a long above the implementation-specific limit is rejected | |
84 as too large to be encoded. | |
85 """ | |
86 smallest = self._getSmallest() | |
87 self.assertRaises(banana.BananaError, self.enc.sendEncoded, smallest) | |
88 | |
89 | |
90 def test_decodeTooLargeLong(self): | |
91 """ | |
92 Test that a long above the implementation specific limit is rejected | |
93 as too large to be decoded. | |
94 """ | |
95 smallest = self._getSmallest() | |
96 self.enc.setPrefixLimit(self.enc.prefixLimit * 2) | |
97 self.enc.sendEncoded(smallest) | |
98 encoded = self.io.getvalue() | |
99 self.io.truncate(0) | |
100 self.enc.setPrefixLimit(self.enc.prefixLimit / 2) | |
101 | |
102 self.assertRaises(banana.BananaError, self.enc.dataReceived, encoded) | |
103 | |
104 | |
105 def _getLargest(self): | |
106 return -self._getSmallest() | |
107 | |
108 | |
109 def test_encodeTooSmallLong(self): | |
110 """ | |
111 Test that a negative long below the implementation-specific limit is | |
112 rejected as too small to be encoded. | |
113 """ | |
114 largest = self._getLargest() | |
115 self.assertRaises(banana.BananaError, self.enc.sendEncoded, largest) | |
116 | |
117 | |
118 def test_decodeTooSmallLong(self): | |
119 """ | |
120 Test that a negative long below the implementation specific limit is | |
121 rejected as too small to be decoded. | |
122 """ | |
123 largest = self._getLargest() | |
124 self.enc.setPrefixLimit(self.enc.prefixLimit * 2) | |
125 self.enc.sendEncoded(largest) | |
126 encoded = self.io.getvalue() | |
127 self.io.truncate(0) | |
128 self.enc.setPrefixLimit(self.enc.prefixLimit / 2) | |
129 | |
130 self.assertRaises(banana.BananaError, self.enc.dataReceived, encoded) | |
131 | |
132 | |
133 def testNegativeLong(self): | |
134 self.enc.sendEncoded(-1015l) | |
135 self.enc.dataReceived(self.io.getvalue()) | |
136 assert self.result == -1015l, "should be -1015l, got %s" % self.result | |
137 | |
138 def testInteger(self): | |
139 self.enc.sendEncoded(1015) | |
140 self.enc.dataReceived(self.io.getvalue()) | |
141 assert self.result == 1015, "should be 1015, got %s" % self.result | |
142 | |
143 def testNegative(self): | |
144 self.enc.sendEncoded(-1015) | |
145 self.enc.dataReceived(self.io.getvalue()) | |
146 assert self.result == -1015, "should be -1015, got %s" % self.result | |
147 | |
148 def testFloat(self): | |
149 self.enc.sendEncoded(1015.) | |
150 self.enc.dataReceived(self.io.getvalue()) | |
151 assert self.result == 1015. | |
152 | |
153 def testList(self): | |
154 foo = [1, 2, [3, 4], [30.5, 40.2], 5, ["six", "seven", ["eight", 9]], [1
0], []] | |
155 self.enc.sendEncoded(foo) | |
156 self.enc.dataReceived(self.io.getvalue()) | |
157 assert self.result == foo, "%s!=%s" % (repr(self.result), repr(self.resu
lt)) | |
158 | |
159 def testPartial(self): | |
160 foo = [1, 2, [3, 4], [30.5, 40.2], 5, | |
161 ["six", "seven", ["eight", 9]], [10], | |
162 # TODO: currently the C implementation's a bit buggy... | |
163 sys.maxint * 3l, sys.maxint * 2l, sys.maxint * -2l] | |
164 self.enc.sendEncoded(foo) | |
165 for byte in self.io.getvalue(): | |
166 self.enc.dataReceived(byte) | |
167 assert self.result == foo, "%s!=%s" % (repr(self.result), repr(foo)) | |
168 | |
169 def feed(self, data): | |
170 for byte in data: | |
171 self.enc.dataReceived(byte) | |
172 def testOversizedList(self): | |
173 data = '\x02\x01\x01\x01\x01\x80' | |
174 # list(size=0x0101010102, about 4.3e9) | |
175 self.failUnlessRaises(banana.BananaError, self.feed, data) | |
176 def testOversizedString(self): | |
177 data = '\x02\x01\x01\x01\x01\x82' | |
178 # string(size=0x0101010102, about 4.3e9) | |
179 self.failUnlessRaises(banana.BananaError, self.feed, data) | |
180 | |
181 def testCrashString(self): | |
182 crashString = '\x00\x00\x00\x00\x04\x80' | |
183 # string(size=0x0400000000, about 17.2e9) | |
184 | |
185 # cBanana would fold that into a 32-bit 'int', then try to allocate | |
186 # a list with PyList_New(). cBanana ignored the NULL return value, | |
187 # so it would segfault when trying to free the imaginary list. | |
188 | |
189 # This variant doesn't segfault straight out in my environment. | |
190 # Instead, it takes up large amounts of CPU and memory... | |
191 #crashString = '\x00\x00\x00\x00\x01\x80' | |
192 # print repr(crashString) | |
193 #self.failUnlessRaises(Exception, self.enc.dataReceived, crashString) | |
194 try: | |
195 # should now raise MemoryError | |
196 self.enc.dataReceived(crashString) | |
197 except banana.BananaError: | |
198 pass | |
199 | |
200 def testCrashNegativeLong(self): | |
201 # There was a bug in cBanana which relied on negating a negative integer | |
202 # always giving a postive result, but for the lowest possible number in | |
203 # 2s-complement arithmetic, that's not true, i.e. | |
204 # long x = -2147483648; | |
205 # long y = -x; | |
206 # x == y; /* true! */ | |
207 # (assuming 32-bit longs) | |
208 self.enc.sendEncoded(-2147483648) | |
209 self.enc.dataReceived(self.io.getvalue()) | |
210 assert self.result == -2147483648, "should be -2147483648, got %s" % sel
f.result | |
211 | |
212 | |
213 def test_sizedIntegerTypes(self): | |
214 """ | |
215 Test that integers below the maximum C{INT} token size cutoff are | |
216 serialized as C{INT} or C{NEG} and that larger integers are | |
217 serialized as C{LONGINT} or C{LONGNEG}. | |
218 """ | |
219 def encoded(n): | |
220 self.io.seek(0) | |
221 self.io.truncate() | |
222 self.enc.sendEncoded(n) | |
223 return self.io.getvalue() | |
224 | |
225 baseIntIn = +2147483647 | |
226 baseNegIn = -2147483648 | |
227 | |
228 baseIntOut = '\x7f\x7f\x7f\x07\x81' | |
229 self.assertEqual(encoded(baseIntIn - 2), '\x7d' + baseIntOut) | |
230 self.assertEqual(encoded(baseIntIn - 1), '\x7e' + baseIntOut) | |
231 self.assertEqual(encoded(baseIntIn - 0), '\x7f' + baseIntOut) | |
232 | |
233 baseLongIntOut = '\x00\x00\x00\x08\x85' | |
234 self.assertEqual(encoded(baseIntIn + 1), '\x00' + baseLongIntOut) | |
235 self.assertEqual(encoded(baseIntIn + 2), '\x01' + baseLongIntOut) | |
236 self.assertEqual(encoded(baseIntIn + 3), '\x02' + baseLongIntOut) | |
237 | |
238 baseNegOut = '\x7f\x7f\x7f\x07\x83' | |
239 self.assertEqual(encoded(baseNegIn + 2), '\x7e' + baseNegOut) | |
240 self.assertEqual(encoded(baseNegIn + 1), '\x7f' + baseNegOut) | |
241 self.assertEqual(encoded(baseNegIn + 0), '\x00\x00\x00\x00\x08\x83') | |
242 | |
243 baseLongNegOut = '\x00\x00\x00\x08\x86' | |
244 self.assertEqual(encoded(baseNegIn - 1), '\x01' + baseLongNegOut) | |
245 self.assertEqual(encoded(baseNegIn - 2), '\x02' + baseLongNegOut) | |
246 self.assertEqual(encoded(baseNegIn - 3), '\x03' + baseLongNegOut) | |
247 | |
248 | |
249 | |
250 class GlobalCoderTests(unittest.TestCase): | |
251 """ | |
252 Tests for the free functions L{banana.encode} and L{banana.decode}. | |
253 """ | |
254 def test_statelessDecode(self): | |
255 """ | |
256 Test that state doesn't carry over between calls to L{banana.decode}. | |
257 """ | |
258 # Banana encoding of 2 ** 449 | |
259 undecodable = '\x7f' * 65 + '\x85' | |
260 self.assertRaises(banana.BananaError, banana.decode, undecodable) | |
261 | |
262 # Banana encoding of 1 | |
263 decodable = '\x01\x81' | |
264 self.assertEqual(banana.decode(decodable), 1) | |
OLD | NEW |