| OLD | NEW |
| (Empty) |
| 1 # Copyright (c) 2001-2005 Twisted Matrix Laboratories. | |
| 2 # See LICENSE for details. | |
| 3 | |
| 4 | |
| 5 from twisted.trial import unittest | |
| 6 | |
| 7 from twisted.words.protocols import toc | |
| 8 from twisted.internet import protocol, main | |
| 9 from twisted.python import failure | |
| 10 | |
| 11 import StringIO | |
| 12 from struct import pack,unpack | |
| 13 | |
| 14 class StringIOWithoutClosing(StringIO.StringIO): | |
| 15 def close(self): | |
| 16 pass | |
| 17 | |
| 18 class DummyTOC(toc.TOC): | |
| 19 """ | |
| 20 used to override authentication, now overrides printing. | |
| 21 """ | |
| 22 def _debug(self,data): | |
| 23 pass | |
| 24 SEQID=1001 | |
| 25 def flap(type,data): | |
| 26 global SEQID | |
| 27 send="*" | |
| 28 send=send+pack("!BHH",type,SEQID,len(data)) | |
| 29 send=send+data | |
| 30 SEQID=SEQID+1 | |
| 31 return send | |
| 32 def readFlap(data): | |
| 33 if data=="": return [None,""] | |
| 34 null,type,seqid,length=unpack("!BBHH",data[:6]) | |
| 35 val=data[6:6+length] | |
| 36 return [[type,val],data[6+length:]] | |
| 37 | |
| 38 class TOCGeneralTestCase(unittest.TestCase): | |
| 39 """ | |
| 40 general testing of TOC functions. | |
| 41 """ | |
| 42 def testTOC(self): | |
| 43 self.runTest() | |
| 44 def runTest(self): | |
| 45 USERS=2 | |
| 46 data=range(USERS) | |
| 47 data[0]=("FLAPON\r\n\r\n",\ | |
| 48 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 49 flap(2,"toc_signon localhost 9999 test 0x100000 english \"penguin 0.1\"\
000"),\ | |
| 50 flap(2,"toc_add_buddy test\000"),\ | |
| 51 flap(2,"toc_init_done\000"),\ | |
| 52 flap(2,"toc_send_im test \"hi\"\000"),\ | |
| 53 flap(2,"toc_send_im test2 \"hello\"\000"),\ | |
| 54 flap(2,"toc_set_away \"not here\"\000"),\ | |
| 55 flap(2,"toc_set_idle 602\000"),\ | |
| 56 flap(2,"toc_set_idle 0\000"),\ | |
| 57 flap(2,"toc_set_away\000"),\ | |
| 58 flap(2,"toc_evil test norm\000"),\ | |
| 59 flap(2,"toc_chat_join 4 \"Test Chat\"\000"),\ | |
| 60 flap(2,"toc_chat_send 0 \"hello\"\000"),\ | |
| 61 #flap(2,"toc_chat_leave 0\000")) #,\ | |
| 62 flap(2,"toc_chat_invite 0 \"come\" ooga\000"),\ | |
| 63 #flap(2,"toc_chat_accept 0\000"),\ | |
| 64 flap(5,"\000"),\ | |
| 65 flap(2,"toc_chat_whisper 0 ooga \"boo ga\"\000"),\ | |
| 66 flap(2,"toc_chat_leave 0"),\ | |
| 67 flap(5,"\000")) | |
| 68 data[1]=("FLAPON\r\n\r\n",\ | |
| 69 flap(1,"\000\000\000\001\000\001\000\004ooga"),\ | |
| 70 flap(2,"toc_signon localhost 9999 ooga 0x100000 english \"penguin 0.1\"\
000"),\ | |
| 71 flap(2,"toc_add_buddy test\000"),\ | |
| 72 flap(2,"toc_init_done\000"),\ | |
| 73 flap(5,"\000"),\ | |
| 74 flap(5,"\000"),\ | |
| 75 #flap(5,"\000"),\ | |
| 76 #flap(5,"\000"),\ | |
| 77 #flap(5,"\000"),\ | |
| 78 flap(5,"\000"),\ | |
| 79 flap(5,"\000"),\ | |
| 80 flap(5,"\000"),\ | |
| 81 flap(5,"\000"),\ | |
| 82 flap(5,"\000"),\ | |
| 83 flap(5,"\000"),\ | |
| 84 flap(5,"\000"),\ | |
| 85 #flap(5,"\000"),\ | |
| 86 flap(2,"toc_chat_accept 0\000"),\ | |
| 87 flap(2,"toc_chat_send 0 \"hi test\"\000"),\ | |
| 88 flap(5,"\000"),\ | |
| 89 flap(2,"toc_chat_leave 0\000")) | |
| 90 strings=range(USERS) | |
| 91 for i in strings: | |
| 92 strings[i]=StringIOWithoutClosing() | |
| 93 fac=toc.TOCFactory() | |
| 94 dummy=range(USERS) | |
| 95 for i in dummy: | |
| 96 dummy[i]=DummyTOC() | |
| 97 dummy[i].factory=fac | |
| 98 dummy[i].makeConnection(protocol.FileWrapper(strings[i])) | |
| 99 while reduce(lambda x,y:x+y,map(lambda x:x==(),data))!=USERS: | |
| 100 for i in range(USERS): | |
| 101 d=data[i] | |
| 102 if len(d)>0: | |
| 103 k,data[i]=d[0],d[1:] | |
| 104 for j in k: | |
| 105 dummy[i].dataReceived(j) # test by doing a character at
a time | |
| 106 else: | |
| 107 dummy[i].connectionLost(failure.Failure(main.CONNECTION_DONE
)) | |
| 108 values=range(USERS) | |
| 109 for i in values: | |
| 110 values[i]=strings[i].getvalue() | |
| 111 flaps=map(lambda x:[],range(USERS)) | |
| 112 for value in values: | |
| 113 i=values.index(value) | |
| 114 f,value=readFlap(value) | |
| 115 while f: | |
| 116 flaps[i].append(f) | |
| 117 f,value=readFlap(value) | |
| 118 ts=range(USERS) | |
| 119 for t in ts: | |
| 120 ts[t]=dummy[t].signontime | |
| 121 shouldequal=range(USERS) | |
| 122 shouldequal[0]=[ \ | |
| 123 [1,"\000\000\000\001"],\ | |
| 124 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 125 [2,"NICK:test\000"],\ | |
| 126 [2,"CONFIG:\00"],\ | |
| 127 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
| 128 [2,"IM_IN:test:F:hi\000"],\ | |
| 129 [2,"ERROR:901:test2\000"],\ | |
| 130 #[2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
| 131 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
| 132 [2,"UPDATE_BUDDY:test:T:0:%s:10: OU\000"%ts[0]],\ | |
| 133 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
| 134 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
| 135 [2,"EVILED:10:test\000"],\ | |
| 136 [2,"UPDATE_BUDDY:test:T:10:%s:0: O\000"%ts[0]],\ | |
| 137 [2,"CHAT_JOIN:0:Test Chat\000"],\ | |
| 138 [2,"CHAT_UPDATE_BUDDY:0:T:test\000"],\ | |
| 139 [2,"CHAT_IN:0:test:F:hello\000"],\ | |
| 140 [2,"CHAT_UPDATE_BUDDY:0:T:ooga\000"],\ | |
| 141 [2,"CHAT_IN:0:ooga:F:hi test\000"],\ | |
| 142 [2,"CHAT_LEFT:0\000"]] | |
| 143 shouldequal[1]=[ \ | |
| 144 [1,"\000\000\000\001"],\ | |
| 145 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 146 [2,"NICK:ooga\000"],\ | |
| 147 [2,"CONFIG:\000"],\ | |
| 148 #[2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
| 149 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
| 150 [2,"UPDATE_BUDDY:test:T:0:%s:10: OU\000"%ts[0]],\ | |
| 151 [2,"UPDATE_BUDDY:test:T:0:%s:0: OU\000"%ts[0]],\ | |
| 152 [2,"UPDATE_BUDDY:test:T:0:%s:0: O\000"%ts[0]],\ | |
| 153 [2,"UPDATE_BUDDY:test:T:10:%s:0: O\000"%ts[0]],\ | |
| 154 [2,"CHAT_INVITE:Test Chat:0:test:come\000"],\ | |
| 155 [2,"CHAT_JOIN:0:Test Chat\000"],\ | |
| 156 [2,"CHAT_UPDATE_BUDDY:0:T:test:ooga\000"],\ | |
| 157 [2,"CHAT_IN:0:ooga:F:hi test\000"],\ | |
| 158 [2,"CHAT_IN:0:test:T:boo ga\000"],\ | |
| 159 [2,"CHAT_UPDATE_BUDDY:0:F:test\000"],\ | |
| 160 [2,"CHAT_LEFT:0\000"]] | |
| 161 if flaps!=shouldequal: | |
| 162 for i in range(len(shouldequal)): | |
| 163 for j in range(len(shouldequal[i])): | |
| 164 if shouldequal[i][j]!=flaps[i][j]: | |
| 165 raise AssertionError("GeneralTest Failed!\nUser %s Line
%s\nactual:%s\nshould be:%s"%(i,j,flaps[i][j],shouldequal[i][j])) | |
| 166 raise AssertionError("GeneralTest Failed with incorrect lengths!") | |
| 167 class TOCMultiPacketTestCase(unittest.TestCase): | |
| 168 """ | |
| 169 i saw this problem when using GAIM. It only read the flaps onces per dataRe
ceived, and would basically block if it ever received two packets together in on
e dataReceived. this tests for that occurance. | |
| 170 """ | |
| 171 def testTOC(self): | |
| 172 self.runTest() | |
| 173 def runTest(self): | |
| 174 packets=["FLAPON\r\n\r\n",\ | |
| 175 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 176 flap(2,"toc_signon null 9999 test 0x100000 english \"penguin 0.1\"\000"
),\ | |
| 177 flap(2,"toc_init_done\000"),\ | |
| 178 flap(2,"toc_send_im test hi\000")] | |
| 179 shouldbe=[[1,"\000\000\000\001"],\ | |
| 180 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 181 [2,"NICK:test\000"],\ | |
| 182 [2,"CONFIG:\000"],\ | |
| 183 [2,"IM_IN:test:F:hi\000"]] | |
| 184 data="" | |
| 185 for i in packets: | |
| 186 data=data+i | |
| 187 s=StringIOWithoutClosing() | |
| 188 d=DummyTOC() | |
| 189 fac=toc.TOCFactory() | |
| 190 d.factory=fac | |
| 191 d.makeConnection(protocol.FileWrapper(s)) | |
| 192 d.dataReceived(data) | |
| 193 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 194 value=s.getvalue() | |
| 195 flaps=[] | |
| 196 f,value=readFlap(value) | |
| 197 while f: | |
| 198 flaps.append(f) | |
| 199 f,value=readFlap(value) | |
| 200 if flaps!=shouldbe: | |
| 201 for i in range(len(flaps)): | |
| 202 if flaps[i]!=shouldbe[i]:raise AssertionError("MultiPacketTest F
ailed!\nactual:%s\nshould be:%s"%(flaps[i],shouldbe[i])) | |
| 203 raise AssertionError("MultiPacketTest Failed with incorrect length!,
printing both lists\nactual:%s\nshould be:%s"%(flaps,shouldbe)) | |
| 204 class TOCSavedValuesTestCase(unittest.TestCase): | |
| 205 def testTOC(self): | |
| 206 self.runTest() | |
| 207 def runTest(self): | |
| 208 password1=toc.roast("test pass") | |
| 209 password2=toc.roast("pass test") | |
| 210 beforesend=[\ | |
| 211 "FLAPON\r\n\r\n",\ | |
| 212 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 213 flap(2,"toc_signon localhost 9999 test %s english \"penguin 0.1\"\000"%
password1),\ | |
| 214 flap(2,"toc_init_done\000"),\ | |
| 215 flap(2,"toc_set_config \"{m 4}\"\000"),\ | |
| 216 flap(2,"toc_format_nickname BOOGA\000"),\ | |
| 217 flap(2,"toc_format_nickname \"T E S T\"\000"),\ | |
| 218 flap(2,"toc_change_passwd \"testpass\" \"pass test\"\000"),\ | |
| 219 flap(2,"toc_change_passwd \"test pass\" \"pass test\"\000")] | |
| 220 beforeexpect=[\ | |
| 221 [1,"\000\000\000\001"],\ | |
| 222 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 223 [2,"NICK:test\000"],\ | |
| 224 [2,"CONFIG:\000"],\ | |
| 225 [2,"ERROR:911\000"],\ | |
| 226 [2,"ADMIN_NICK_STATUS:0\000"],\ | |
| 227 [2,"ERROR:911\000"],\ | |
| 228 [2,"ADMIN_PASSWD_STATUS:0\000"]] | |
| 229 badpasssend=[\ | |
| 230 "FLAPON\r\n\r\n",\ | |
| 231 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 232 flap(2,"toc_signon localhost 9999 test 0x1000 english \"penguin 0.1\"\0
00"),\ | |
| 233 flap(2,"toc_init_done")] | |
| 234 badpassexpect=[\ | |
| 235 [1,"\000\00\000\001"],\ | |
| 236 [2,"ERROR:980\000"]] | |
| 237 goodpasssend=[\ | |
| 238 "FLAPON\r\n\r\n",\ | |
| 239 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 240 flap(2,"toc_signon localhost 9999 test %s english \"penguin 0.1\"\000"%
password2),\ | |
| 241 flap(2,"toc_init_done")] | |
| 242 goodpassexpect=[\ | |
| 243 [1,"\000\000\000\001"],\ | |
| 244 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 245 [2,"NICK:T E S T\000"],\ | |
| 246 [2,"CONFIG:{m 4}\000"]] | |
| 247 fac=toc.TOCFactory() | |
| 248 d=DummyTOC() | |
| 249 d.factory=fac | |
| 250 s=StringIOWithoutClosing() | |
| 251 d.makeConnection(protocol.FileWrapper(s)) | |
| 252 for i in beforesend: | |
| 253 d.dataReceived(i) | |
| 254 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 255 v=s.getvalue() | |
| 256 flaps=[] | |
| 257 f,v=readFlap(v) | |
| 258 while f: | |
| 259 flaps.append(f) | |
| 260 f,v=readFlap(v) | |
| 261 if flaps!=beforeexpect: | |
| 262 for i in range(len(flaps)): | |
| 263 if flaps[i]!=beforeexpect[i]: | |
| 264 raise AssertionError("SavedValuesTest Before Failed!\nactual
:%s\nshould be:%s"%(flaps[i],beforeexpect[i])) | |
| 265 raise AssertionError("SavedValuesTest Before Failed with incorrect l
ength!\nactual:%s\nshould be:%s"%(flaps,beforeexpect)) | |
| 266 d=DummyTOC() | |
| 267 d.factory=fac | |
| 268 s=StringIOWithoutClosing() | |
| 269 d.makeConnection(protocol.FileWrapper(s)) | |
| 270 for i in badpasssend: | |
| 271 d.dataReceived(i) | |
| 272 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 273 v=s.getvalue() | |
| 274 flaps=[] | |
| 275 f,v=readFlap(v) | |
| 276 while f: | |
| 277 flaps.append(f) | |
| 278 f,v=readFlap(v) | |
| 279 if flaps!=badpassexpect: | |
| 280 for i in range(len(flaps)): | |
| 281 if flaps[i]!=badpassexpect[i]: | |
| 282 raise AssertionError("SavedValuesTest BadPass Failed!\nactua
l:%s\nshould be:%s"%(flaps[i],badpassexpect[i])) | |
| 283 raise AssertionError("SavedValuesTest BadPass Failed with incorrect
length!\nactual:%s\nshould be:%s"%(flaps,badpassexpect)) | |
| 284 d=DummyTOC() | |
| 285 d.factory=fac | |
| 286 s=StringIOWithoutClosing() | |
| 287 d.makeConnection(protocol.FileWrapper(s)) | |
| 288 for i in goodpasssend: | |
| 289 d.dataReceived(i) | |
| 290 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 291 v=s.getvalue() | |
| 292 flaps=[] | |
| 293 f,v=readFlap(v) | |
| 294 while f: | |
| 295 flaps.append(f) | |
| 296 f,v=readFlap(v) | |
| 297 if flaps!=goodpassexpect: | |
| 298 for i in range(len(flaps)): | |
| 299 if flaps[i]!=goodpassexpect[i]: | |
| 300 raise AssertionError("SavedValuesTest GoodPass Failed!\nactu
al:%s\nshould be:%s"%(flaps[i],goodpassexpect[i])) | |
| 301 raise AssertionError("SavedValuesTest GoodPass Failed with incorrect
length!\nactual:%s\nshould be:%s"%(flaps,beforeexpect)) | |
| 302 class TOCPrivacyTestCase(unittest.TestCase): | |
| 303 def runTest(self): | |
| 304 sends=["FLAPON\r\n\r\n",\ | |
| 305 flap(1,"\000\000\000\001\000\001\000\004test"),\ | |
| 306 flap(2,"toc_signon localhost 9999 test 0x00 english penguin\000"),\ | |
| 307 flap(2,"toc_init_done\000"),\ | |
| 308 flap(2,"toc_add_deny\000"),\ | |
| 309 flap(2,"toc_send_im test 1\000"),\ | |
| 310 flap(2,"toc_add_deny test\000"),\ | |
| 311 flap(2,"toc_send_im test 2\000"),\ | |
| 312 flap(2,"toc_add_permit\000"),\ | |
| 313 flap(2,"toc_send_im test 3\000"),\ | |
| 314 flap(2,"toc_add_permit test\000"),\ | |
| 315 flap(2,"toc_send_im test 4\000")] | |
| 316 expect=[[1,"\000\000\000\001"],\ | |
| 317 [2,"SIGN_ON:TOC1.0\000"],\ | |
| 318 [2,"NICK:test\000"],\ | |
| 319 [2,"CONFIG:\000"],\ | |
| 320 [2,"IM_IN:test:F:1\000"],\ | |
| 321 [2,"ERROR:901:test\000"],\ | |
| 322 [2,"ERROR:901:test\000"],\ | |
| 323 [2,"IM_IN:test:F:4\000"]] | |
| 324 d=DummyTOC() | |
| 325 d.factory=toc.TOCFactory() | |
| 326 s=StringIOWithoutClosing() | |
| 327 d.makeConnection(protocol.FileWrapper(s)) | |
| 328 for i in sends: | |
| 329 d.dataReceived(i) | |
| 330 d.connectionLost(failure.Failure(main.CONNECTION_DONE)) | |
| 331 v=s.getvalue() | |
| 332 flaps=[] | |
| 333 f,v=readFlap(v) | |
| 334 while f: | |
| 335 flaps.append(f) | |
| 336 f,v=readFlap(v) | |
| 337 if flaps!=expect: | |
| 338 for i in range(len(flaps)): | |
| 339 if flaps[i]!=expect[i]: | |
| 340 raise AssertionError("PrivacyTest Before Failed!\nactual:%s\
nshould be:%s"%(flaps[i],expect[i])) | |
| 341 raise AssertionError("PrivacyTest Before Failed with incorrect lengt
h!\nactual:%s\nshould be:%s"%(flaps,expect)) | |
| 342 testCases=[TOCGeneralTestCase,TOCMultiPacketTestCase,TOCSavedValuesTestCase,TOCP
rivacyTestCase] | |
| 343 | |
| OLD | NEW |