OLD | NEW |
| (Empty) |
1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories. | |
2 # See LICENSE for details. | |
3 | |
4 | |
5 from twisted.trial import unittest | |
6 | |
7 from twisted.words.protocols import toc | |
8 from twisted.internet import protocol, main | |
9 from twisted.python import failure | |
10 | |
11 import StringIO | |
12 from struct import pack,unpack | |
13 | |
14 class StringIOWithoutClosing(StringIO.StringIO): | |
15 def close(self): | |
16 pass | |
17 | |
18 class DummyTOC(toc.TOC): | |
19 """ | |
20 used to override authentication, now overrides printing. | |
21 """ | |
22 def _debug(self,data): | |
23 pass | |
24 SEQID=1001 | |
25 def flap(type,data): | |
26 global SEQID | |
27 send="*" | |
28 send=send+pack("!BHH",type,SEQID,len(data)) | |
29 send=send+data | |
30 SEQID=SEQID+1 | |
31 return send | |
32 def readFlap(data): | |
33 if data=="": return [None,""] | |
34 null,type,seqid,length=unpack("!BBHH",data[:6]) | |
35 val=data[6:6+length] | |
36 return [[type,val],data[6+length:]] | |
37 | |
38 class TOCGeneralTestCase(unittest.TestCase): | |
39 """ | |
40 general testing of TOC functions. | |
41 """ | |
42 def testTOC(self): | |
43 self.runTest() | |
44 def runTest(self): | |
45 USERS=2 | |
46 data=range(USERS) | |
47 data[0]=("FLAPON\r\n\r\n",\ | |
48 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
49 flap(2,"toc_signon localhost 9999 test 0x100000 english \"penguin 0.1\"\
000"),\ | |
50 flap(2,"toc_add_buddy test\000"),\ | |
51 flap(2,"toc_init_done\000"),\ | |
52 flap(2,"toc_send_im test \"hi\"\000"),\ | |
53 flap(2,"toc_send_im test2 \"hello\"\000"),\ | |
54 flap(2,"toc_set_away \"not here\"\000"),\ | |
55 flap(2,"toc_set_idle 602\000"),\ | |
56 flap(2,"toc_set_idle 0\000"),\ | |
57 flap(2,"toc_set_away\000"),\ | |
58 flap(2,"toc_evil test norm\000"),\ | |
59 flap(2,"toc_chat_join 4 \"Test Chat\"\000"),\ | |
60 flap(2,"toc_chat_send 0 \"hello\"\000"),\ | |
61 #flap(2,"toc_chat_leave 0\000")) #,\ | |
62 flap(2,"toc_chat_invite 0 \"come\" ooga\000"),\ | |
63 #flap(2,"toc_chat_accept 0\000"),\ | |
64 flap(5,"\000"),\ | |
65 flap(2,"toc_chat_whisper 0 ooga \"boo ga\"\000"),\ | |
66 flap(2,"toc_chat_leave 0"),\ | |
67 flap(5,"\000")) | |
68 data[1]=("FLAPON\r\n\r\n",\ | |
69 flap(1,"\000\000\000\001\000\001\000\004ooga"),\ | |
70 flap(2,"toc_signon localhost 9999 ooga 0x100000 english \"penguin 0.1\"\
000"),\ | |
71 flap(2,"toc_add_buddy test\000"),\ | |
72 flap(2,"toc_init_done\000"),\ | |
73 flap(5,"\000"),\ | |
74 flap(5,"\000"),\ | |
75 #flap(5,"\000"),\ | |
76 #flap(5,"\000"),\ | |
77 #flap(5,"\000"),\ | |
78 flap(5,"\000"),\ | |
79 flap(5,"\000"),\ | |
80 flap(5,"\000"),\ | |
81 flap(5,"\000"),\ | |
82 flap(5,"\000"),\ | |
83 flap(5,"\000"),\ | |
84 flap(5,"\000"),\ | |
85 #flap(5,"\000"),\ | |
86 flap(2,"toc_chat_accept 0\000"),\ | |
87 flap(2,"toc_chat_send 0 \"hi test\"\000"),\ | |
88 flap(5,"\000"),\ | |
89 flap(2,"toc_chat_leave 0\000")) | |
90 strings=range(USERS) | |
91 for i in strings: | |
92 strings[i]=StringIOWithoutClosing() | |
93 fac=toc.TOCFactory() | |
94 dummy=range(USERS) | |
95 for i in dummy: | |
96 dummy[i]=DummyTOC() | |
97 dummy[i].factory=fac | |
98 dummy[i].makeConnection(protocol.FileWrapper(strings[i])) | |
99 while reduce(lambda x,y:x+y,map(lambda x:x==(),data))!=USERS: | |
100 for i in range(USERS): | |
101 d=data[i] | |
102 if len(d)>0: | |
103 k,data[i]=d[0],d[1:] | |
104 for j in k: | |
105 dummy[i].dataReceived(j) # test by doing a character at
a time | |
106 else: | |
107 dummy[i].connectionLost(failure.Failure(main.CONNECTION_DONE
)) | |
108 values=range(USERS) | |
109 for i in values: | |
110 values[i]=strings[i].getvalue() | |
111 flaps=map(lambda x:[],range(USERS)) | |
112 for value in values: | |
113 i=values.index(value) | |
114 f,value=readFlap(value) | |
115 while f: | |
116 flaps[i].append(f) | |
117 f,value=readFlap(value) | |
118 ts=range(USERS) | |
119 for t in ts: | |
120 ts[t]=dummy[t].signontime | |
121 shouldequal=range(USERS) | |
122 shouldequal[0]=[ \ | |
123 [1,"\000\000\000\001"],\ | |
124 [2,"SIGN_ON:TOC1.0\000"],\ | |
125 [2,"NICK:test\000"],\ | |
126 [2,"CONFIG:\00"],\ | |
127 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
128 [2,"IM_IN:test:F:hi\000"],\ | |
129 [2,"ERROR:901:test2\000"],\ | |
130 #[2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
131 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
132 [2,"UPDATE_BUDDY:test:T:0:%s:10: OU\000"%ts[0]],\ | |
133 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
134 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
135 [2,"EVILED:10:test\000"],\ | |
136 [2,"UPDATE_BUDDY:test:T:10:%s:0: O\000"%ts[0]],\ | |
137 [2,"CHAT_JOIN:0:Test Chat\000"],\ | |
138 [2,"CHAT_UPDATE_BUDDY:0:T:test\000"],\ | |
139 [2,"CHAT_IN:0:test:F:hello\000"],\ | |
140 [2,"CHAT_UPDATE_BUDDY:0:T:ooga\000"],\ | |
141 [2,"CHAT_IN:0:ooga:F:hi test\000"],\ | |
142 [2,"CHAT_LEFT:0\000"]] | |
143 shouldequal[1]=[ \ | |
144 [1,"\000\000\000\001"],\ | |
145 [2,"SIGN_ON:TOC1.0\000"],\ | |
146 [2,"NICK:ooga\000"],\ | |
147 [2,"CONFIG:\000"],\ | |
148 #[2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
149 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
150 [2,"UPDATE_BUDDY:test:T:0:%s:10: OU\000"%ts[0]],\ | |
151 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
152 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
153 [2,"UPDATE_BUDDY:test:T:10:%s:0: O\000"%ts[0]],\ | |
154 [2,"CHAT_INVITE:Test Chat:0:test:come\000"],\ | |
155 [2,"CHAT_JOIN:0:Test Chat\000"],\ | |
156 [2,"CHAT_UPDATE_BUDDY:0:T:test:ooga\000"],\ | |
157 [2,"CHAT_IN:0:ooga:F:hi test\000"],\ | |
158 [2,"CHAT_IN:0:test:T:boo ga\000"],\ | |
159 [2,"CHAT_UPDATE_BUDDY:0:F:test\000"],\ | |
160 [2,"CHAT_LEFT:0\000"]] | |
161 if flaps!=shouldequal: | |
162 for i in range(len(shouldequal)): | |
163 for j in range(len(shouldequal[i])): | |
164 if shouldequal[i][j]!=flaps[i][j]: | |
165 raise AssertionError("GeneralTest Failed!\nUser %s Line
%s\nactual:%s\nshould be:%s"%(i,j,flaps[i][j],shouldequal[i][j])) | |
166 raise AssertionError("GeneralTest Failed with incorrect lengths!") | |
167 class TOCMultiPacketTestCase(unittest.TestCase): | |
168 """ | |
169 i saw this problem when using GAIM. It only read the flaps onces per dataRe
ceived, and would basically block if it ever received two packets together in on
e dataReceived. this tests for that occurance. | |
170 """ | |
171 def testTOC(self): | |
172 self.runTest() | |
173 def runTest(self): | |
174 packets=["FLAPON\r\n\r\n",\ | |
175 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
176 flap(2,"toc_signon null 9999 test 0x100000 english \"penguin 0.1\"\000"
),\ | |
177 flap(2,"toc_init_done\000"),\ | |
178 flap(2,"toc_send_im test hi\000")] | |
179 shouldbe=[[1,"\000\000\000\001"],\ | |
180 [2,"SIGN_ON:TOC1.0\000"],\ | |
181 [2,"NICK:test\000"],\ | |
182 [2,"CONFIG:\000"],\ | |
183 [2,"IM_IN:test:F:hi\000"]] | |
184 data="" | |
185 for i in packets: | |
186 data=data+i | |
187 s=StringIOWithoutClosing() | |
188 d=DummyTOC() | |
189 fac=toc.TOCFactory() | |
190 d.factory=fac | |
191 d.makeConnection(protocol.FileWrapper(s)) | |
192 d.dataReceived(data) | |
193 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
194 value=s.getvalue() | |
195 flaps=[] | |
196 f,value=readFlap(value) | |
197 while f: | |
198 flaps.append(f) | |
199 f,value=readFlap(value) | |
200 if flaps!=shouldbe: | |
201 for i in range(len(flaps)): | |
202 if flaps[i]!=shouldbe[i]:raise AssertionError("MultiPacketTest F
ailed!\nactual:%s\nshould be:%s"%(flaps[i],shouldbe[i])) | |
203 raise AssertionError("MultiPacketTest Failed with incorrect length!,
printing both lists\nactual:%s\nshould be:%s"%(flaps,shouldbe)) | |
204 class TOCSavedValuesTestCase(unittest.TestCase): | |
205 def testTOC(self): | |
206 self.runTest() | |
207 def runTest(self): | |
208 password1=toc.roast("test pass") | |
209 password2=toc.roast("pass test") | |
210 beforesend=[\ | |
211 "FLAPON\r\n\r\n",\ | |
212 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
213 flap(2,"toc_signon localhost 9999 test %s english \"penguin 0.1\"\000"%
password1),\ | |
214 flap(2,"toc_init_done\000"),\ | |
215 flap(2,"toc_set_config \"{m 4}\"\000"),\ | |
216 flap(2,"toc_format_nickname BOOGA\000"),\ | |
217 flap(2,"toc_format_nickname \"T E S T\"\000"),\ | |
218 flap(2,"toc_change_passwd \"testpass\" \"pass test\"\000"),\ | |
219 flap(2,"toc_change_passwd \"test pass\" \"pass test\"\000")] | |
220 beforeexpect=[\ | |
221 [1,"\000\000\000\001"],\ | |
222 [2,"SIGN_ON:TOC1.0\000"],\ | |
223 [2,"NICK:test\000"],\ | |
224 [2,"CONFIG:\000"],\ | |
225 [2,"ERROR:911\000"],\ | |
226 [2,"ADMIN_NICK_STATUS:0\000"],\ | |
227 [2,"ERROR:911\000"],\ | |
228 [2,"ADMIN_PASSWD_STATUS:0\000"]] | |
229 badpasssend=[\ | |
230 "FLAPON\r\n\r\n",\ | |
231 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
232 flap(2,"toc_signon localhost 9999 test 0x1000 english \"penguin 0.1\"\0
00"),\ | |
233 flap(2,"toc_init_done")] | |
234 badpassexpect=[\ | |
235 [1,"\000\00\000\001"],\ | |
236 [2,"ERROR:980\000"]] | |
237 goodpasssend=[\ | |
238 "FLAPON\r\n\r\n",\ | |
239 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
240 flap(2,"toc_signon localhost 9999 test %s english \"penguin 0.1\"\000"%
password2),\ | |
241 flap(2,"toc_init_done")] | |
242 goodpassexpect=[\ | |
243 [1,"\000\000\000\001"],\ | |
244 [2,"SIGN_ON:TOC1.0\000"],\ | |
245 [2,"NICK:T E S T\000"],\ | |
246 [2,"CONFIG:{m 4}\000"]] | |
247 fac=toc.TOCFactory() | |
248 d=DummyTOC() | |
249 d.factory=fac | |
250 s=StringIOWithoutClosing() | |
251 d.makeConnection(protocol.FileWrapper(s)) | |
252 for i in beforesend: | |
253 d.dataReceived(i) | |
254 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
255 v=s.getvalue() | |
256 flaps=[] | |
257 f,v=readFlap(v) | |
258 while f: | |
259 flaps.append(f) | |
260 f,v=readFlap(v) | |
261 if flaps!=beforeexpect: | |
262 for i in range(len(flaps)): | |
263 if flaps[i]!=beforeexpect[i]: | |
264 raise AssertionError("SavedValuesTest Before Failed!\nactual
:%s\nshould be:%s"%(flaps[i],beforeexpect[i])) | |
265 raise AssertionError("SavedValuesTest Before Failed with incorrect l
ength!\nactual:%s\nshould be:%s"%(flaps,beforeexpect)) | |
266 d=DummyTOC() | |
267 d.factory=fac | |
268 s=StringIOWithoutClosing() | |
269 d.makeConnection(protocol.FileWrapper(s)) | |
270 for i in badpasssend: | |
271 d.dataReceived(i) | |
272 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
273 v=s.getvalue() | |
274 flaps=[] | |
275 f,v=readFlap(v) | |
276 while f: | |
277 flaps.append(f) | |
278 f,v=readFlap(v) | |
279 if flaps!=badpassexpect: | |
280 for i in range(len(flaps)): | |
281 if flaps[i]!=badpassexpect[i]: | |
282 raise AssertionError("SavedValuesTest BadPass Failed!\nactua
l:%s\nshould be:%s"%(flaps[i],badpassexpect[i])) | |
283 raise AssertionError("SavedValuesTest BadPass Failed with incorrect
length!\nactual:%s\nshould be:%s"%(flaps,badpassexpect)) | |
284 d=DummyTOC() | |
285 d.factory=fac | |
286 s=StringIOWithoutClosing() | |
287 d.makeConnection(protocol.FileWrapper(s)) | |
288 for i in goodpasssend: | |
289 d.dataReceived(i) | |
290 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
291 v=s.getvalue() | |
292 flaps=[] | |
293 f,v=readFlap(v) | |
294 while f: | |
295 flaps.append(f) | |
296 f,v=readFlap(v) | |
297 if flaps!=goodpassexpect: | |
298 for i in range(len(flaps)): | |
299 if flaps[i]!=goodpassexpect[i]: | |
300 raise AssertionError("SavedValuesTest GoodPass Failed!\nactu
al:%s\nshould be:%s"%(flaps[i],goodpassexpect[i])) | |
301 raise AssertionError("SavedValuesTest GoodPass Failed with incorrect
length!\nactual:%s\nshould be:%s"%(flaps,beforeexpect)) | |
302 class TOCPrivacyTestCase(unittest.TestCase): | |
303 def runTest(self): | |
304 sends=["FLAPON\r\n\r\n",\ | |
305 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
306 flap(2,"toc_signon localhost 9999 test 0x00 english penguin\000"),\ | |
307 flap(2,"toc_init_done\000"),\ | |
308 flap(2,"toc_add_deny\000"),\ | |
309 flap(2,"toc_send_im test 1\000"),\ | |
310 flap(2,"toc_add_deny test\000"),\ | |
311 flap(2,"toc_send_im test 2\000"),\ | |
312 flap(2,"toc_add_permit\000"),\ | |
313 flap(2,"toc_send_im test 3\000"),\ | |
314 flap(2,"toc_add_permit test\000"),\ | |
315 flap(2,"toc_send_im test 4\000")] | |
316 expect=[[1,"\000\000\000\001"],\ | |
317 [2,"SIGN_ON:TOC1.0\000"],\ | |
318 [2,"NICK:test\000"],\ | |
319 [2,"CONFIG:\000"],\ | |
320 [2,"IM_IN:test:F:1\000"],\ | |
321 [2,"ERROR:901:test\000"],\ | |
322 [2,"ERROR:901:test\000"],\ | |
323 [2,"IM_IN:test:F:4\000"]] | |
324 d=DummyTOC() | |
325 d.factory=toc.TOCFactory() | |
326 s=StringIOWithoutClosing() | |
327 d.makeConnection(protocol.FileWrapper(s)) | |
328 for i in sends: | |
329 d.dataReceived(i) | |
330 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
331 v=s.getvalue() | |
332 flaps=[] | |
333 f,v=readFlap(v) | |
334 while f: | |
335 flaps.append(f) | |
336 f,v=readFlap(v) | |
337 if flaps!=expect: | |
338 for i in range(len(flaps)): | |
339 if flaps[i]!=expect[i]: | |
340 raise AssertionError("PrivacyTest Before Failed!\nactual:%s\
nshould be:%s"%(flaps[i],expect[i])) | |
341 raise AssertionError("PrivacyTest Before Failed with incorrect lengt
h!\nactual:%s\nshould be:%s"%(flaps,expect)) | |
342 testCases=[TOCGeneralTestCase,TOCMultiPacketTestCase,TOCSavedValuesTestCase,TOCP
rivacyTestCase] | |
343 | |
OLD | NEW |