OLD | NEW |
| (Empty) |
1 # -*- test-case-name: twisted.test.test_sip -*- | |
2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
3 # See LICENSE for details. | |
4 | |
5 | |
6 """Session Initialization Protocol tests.""" | |
7 | |
8 from twisted.trial import unittest | |
9 from twisted.protocols import sip | |
10 from twisted.internet import defer, reactor | |
11 | |
12 from twisted.test import proto_helpers | |
13 | |
14 from twisted import cred | |
15 import twisted.cred.portal | |
16 import twisted.cred.checkers | |
17 | |
18 from zope.interface import implements | |
19 | |
20 | |
21 # request, prefixed by random CRLFs | |
22 request1 = "\n\r\n\n\r" + """\ | |
23 INVITE sip:foo SIP/2.0 | |
24 From: mo | |
25 To: joe | |
26 Content-Length: 4 | |
27 | |
28 abcd""".replace("\n", "\r\n") | |
29 | |
30 # request, no content-length | |
31 request2 = """INVITE sip:foo SIP/2.0 | |
32 From: mo | |
33 To: joe | |
34 | |
35 1234""".replace("\n", "\r\n") | |
36 | |
37 # request, with garbage after | |
38 request3 = """INVITE sip:foo SIP/2.0 | |
39 From: mo | |
40 To: joe | |
41 Content-Length: 4 | |
42 | |
43 1234 | |
44 | |
45 lalalal""".replace("\n", "\r\n") | |
46 | |
47 # three requests | |
48 request4 = """INVITE sip:foo SIP/2.0 | |
49 From: mo | |
50 To: joe | |
51 Content-Length: 0 | |
52 | |
53 INVITE sip:loop SIP/2.0 | |
54 From: foo | |
55 To: bar | |
56 Content-Length: 4 | |
57 | |
58 abcdINVITE sip:loop SIP/2.0 | |
59 From: foo | |
60 To: bar | |
61 Content-Length: 4 | |
62 | |
63 1234""".replace("\n", "\r\n") | |
64 | |
65 # response, no content | |
66 response1 = """SIP/2.0 200 OK | |
67 From: foo | |
68 To:bar | |
69 Content-Length: 0 | |
70 | |
71 """.replace("\n", "\r\n") | |
72 | |
73 # short header version | |
74 request_short = """\ | |
75 INVITE sip:foo SIP/2.0 | |
76 f: mo | |
77 t: joe | |
78 l: 4 | |
79 | |
80 abcd""".replace("\n", "\r\n") | |
81 | |
82 request_natted = """\ | |
83 INVITE sip:foo SIP/2.0 | |
84 Via: SIP/2.0/UDP 10.0.0.1:5060;rport | |
85 | |
86 """.replace("\n", "\r\n") | |
87 | |
88 class TestRealm: | |
89 def requestAvatar(self, avatarId, mind, *interfaces): | |
90 return sip.IContact, None, lambda: None | |
91 | |
92 class MessageParsingTestCase(unittest.TestCase): | |
93 def setUp(self): | |
94 self.l = [] | |
95 self.parser = sip.MessagesParser(self.l.append) | |
96 | |
97 def feedMessage(self, message): | |
98 self.parser.dataReceived(message) | |
99 self.parser.dataDone() | |
100 | |
101 def validateMessage(self, m, method, uri, headers, body): | |
102 """Validate Requests.""" | |
103 self.assertEquals(m.method, method) | |
104 self.assertEquals(m.uri.toString(), uri) | |
105 self.assertEquals(m.headers, headers) | |
106 self.assertEquals(m.body, body) | |
107 self.assertEquals(m.finished, 1) | |
108 | |
109 def testSimple(self): | |
110 l = self.l | |
111 self.feedMessage(request1) | |
112 self.assertEquals(len(l), 1) | |
113 self.validateMessage(l[0], "INVITE", "sip:foo", | |
114 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
115 "abcd") | |
116 | |
117 def testTwoMessages(self): | |
118 l = self.l | |
119 self.feedMessage(request1) | |
120 self.feedMessage(request2) | |
121 self.assertEquals(len(l), 2) | |
122 self.validateMessage(l[0], "INVITE", "sip:foo", | |
123 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
124 "abcd") | |
125 self.validateMessage(l[1], "INVITE", "sip:foo", | |
126 {"from": ["mo"], "to": ["joe"]}, | |
127 "1234") | |
128 | |
129 def testGarbage(self): | |
130 l = self.l | |
131 self.feedMessage(request3) | |
132 self.assertEquals(len(l), 1) | |
133 self.validateMessage(l[0], "INVITE", "sip:foo", | |
134 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
135 "1234") | |
136 | |
137 def testThreeInOne(self): | |
138 l = self.l | |
139 self.feedMessage(request4) | |
140 self.assertEquals(len(l), 3) | |
141 self.validateMessage(l[0], "INVITE", "sip:foo", | |
142 {"from": ["mo"], "to": ["joe"], "content-length": [
"0"]}, | |
143 "") | |
144 self.validateMessage(l[1], "INVITE", "sip:loop", | |
145 {"from": ["foo"], "to": ["bar"], "content-length":
["4"]}, | |
146 "abcd") | |
147 self.validateMessage(l[2], "INVITE", "sip:loop", | |
148 {"from": ["foo"], "to": ["bar"], "content-length":
["4"]}, | |
149 "1234") | |
150 | |
151 def testShort(self): | |
152 l = self.l | |
153 self.feedMessage(request_short) | |
154 self.assertEquals(len(l), 1) | |
155 self.validateMessage(l[0], "INVITE", "sip:foo", | |
156 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
157 "abcd") | |
158 | |
159 def testSimpleResponse(self): | |
160 l = self.l | |
161 self.feedMessage(response1) | |
162 self.assertEquals(len(l), 1) | |
163 m = l[0] | |
164 self.assertEquals(m.code, 200) | |
165 self.assertEquals(m.phrase, "OK") | |
166 self.assertEquals(m.headers, {"from": ["foo"], "to": ["bar"], "content-l
ength": ["0"]}) | |
167 self.assertEquals(m.body, "") | |
168 self.assertEquals(m.finished, 1) | |
169 | |
170 | |
171 class MessageParsingTestCase2(MessageParsingTestCase): | |
172 """Same as base class, but feed data char by char.""" | |
173 | |
174 def feedMessage(self, message): | |
175 for c in message: | |
176 self.parser.dataReceived(c) | |
177 self.parser.dataDone() | |
178 | |
179 | |
180 class MakeMessageTestCase(unittest.TestCase): | |
181 | |
182 def testRequest(self): | |
183 r = sip.Request("INVITE", "sip:foo") | |
184 r.addHeader("foo", "bar") | |
185 self.assertEquals(r.toString(), "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\
r\n") | |
186 | |
187 def testResponse(self): | |
188 r = sip.Response(200, "OK") | |
189 r.addHeader("foo", "bar") | |
190 r.addHeader("Content-Length", "4") | |
191 r.bodyDataReceived("1234") | |
192 self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-L
ength: 4\r\n\r\n1234") | |
193 | |
194 def testStatusCode(self): | |
195 r = sip.Response(200) | |
196 self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\n\r\n") | |
197 | |
198 | |
199 class ViaTestCase(unittest.TestCase): | |
200 | |
201 def checkRoundtrip(self, v): | |
202 s = v.toString() | |
203 self.assertEquals(s, sip.parseViaHeader(s).toString()) | |
204 | |
205 def testExtraWhitespace(self): | |
206 v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060') | |
207 v2 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060') | |
208 self.assertEquals(v1.transport, v2.transport) | |
209 self.assertEquals(v1.host, v2.host) | |
210 self.assertEquals(v1.port, v2.port) | |
211 | |
212 def testComplex(self): | |
213 s = "SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1 ;branch=a
7c6a8dlze (Example)" | |
214 v = sip.parseViaHeader(s) | |
215 self.assertEquals(v.transport, "UDP") | |
216 self.assertEquals(v.host, "first.example.com") | |
217 self.assertEquals(v.port, 4000) | |
218 self.assertEquals(v.ttl, 16) | |
219 self.assertEquals(v.maddr, "224.2.0.1") | |
220 self.assertEquals(v.branch, "a7c6a8dlze") | |
221 self.assertEquals(v.hidden, 0) | |
222 self.assertEquals(v.toString(), | |
223 "SIP/2.0/UDP first.example.com:4000;ttl=16;branch=a7c6
a8dlze;maddr=224.2.0.1") | |
224 self.checkRoundtrip(v) | |
225 | |
226 def testSimple(self): | |
227 s = "SIP/2.0/UDP example.com;hidden" | |
228 v = sip.parseViaHeader(s) | |
229 self.assertEquals(v.transport, "UDP") | |
230 self.assertEquals(v.host, "example.com") | |
231 self.assertEquals(v.port, 5060) | |
232 self.assertEquals(v.ttl, None) | |
233 self.assertEquals(v.maddr, None) | |
234 self.assertEquals(v.branch, None) | |
235 self.assertEquals(v.hidden, 1) | |
236 self.assertEquals(v.toString(), | |
237 "SIP/2.0/UDP example.com:5060;hidden") | |
238 self.checkRoundtrip(v) | |
239 | |
240 def testSimpler(self): | |
241 v = sip.Via("example.com") | |
242 self.checkRoundtrip(v) | |
243 | |
244 def testRPort(self): | |
245 v = sip.Via("foo.bar", rport=True) | |
246 self.assertEquals(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport") | |
247 | |
248 def testNAT(self): | |
249 s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345" | |
250 v = sip.parseViaHeader(s) | |
251 self.assertEquals(v.transport, "UDP") | |
252 self.assertEquals(v.host, "10.0.0.1") | |
253 self.assertEquals(v.port, 5060) | |
254 self.assertEquals(v.received, "22.13.1.5") | |
255 self.assertEquals(v.rport, 12345) | |
256 | |
257 self.assertNotEquals(v.toString().find("rport=12345"), -1) | |
258 | |
259 class URLTestCase(unittest.TestCase): | |
260 | |
261 def testRoundtrip(self): | |
262 for url in [ | |
263 "sip:j.doe@big.com", | |
264 "sip:j.doe:secret@big.com;transport=tcp", | |
265 "sip:j.doe@big.com?subject=project", | |
266 "sip:example.com", | |
267 ]: | |
268 self.assertEquals(sip.parseURL(url).toString(), url) | |
269 | |
270 def testComplex(self): | |
271 s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;" | |
272 "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d") | |
273 url = sip.parseURL(s) | |
274 for k, v in [("username", "user"), ("password", "pass"), | |
275 ("host", "hosta"), ("port", 123), | |
276 ("transport", "udp"), ("usertype", "phone"), | |
277 ("method", "foo"), ("ttl", 12), | |
278 ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]), | |
279 ("headers", {"a": "b", "c": "d"})]: | |
280 self.assertEquals(getattr(url, k), v) | |
281 | |
282 | |
283 class ParseTestCase(unittest.TestCase): | |
284 | |
285 def testParseAddress(self): | |
286 for address, name, urls, params in [ | |
287 ('"A. G. Bell" <sip:foo@example.com>', "A. G. Bell", "sip:foo@exampl
e.com", {}), | |
288 ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}), | |
289 ("sip:foo@example.com", "", "sip:foo@example.com", {}), | |
290 ("<sip:foo@example.com>", "", "sip:foo@example.com", {}), | |
291 ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo", "sip:foo@exampl
e.com", {"tag": "bar", "foo": "baz"}), | |
292 ]: | |
293 gname, gurl, gparams = sip.parseAddress(address) | |
294 self.assertEquals(name, gname) | |
295 self.assertEquals(gurl.toString(), urls) | |
296 self.assertEquals(gparams, params) | |
297 | |
298 | |
299 class DummyLocator: | |
300 implements(sip.ILocator) | |
301 def getAddress(self, logicalURL): | |
302 return defer.succeed(sip.URL("server.com", port=5060)) | |
303 | |
304 class FailingLocator: | |
305 implements(sip.ILocator) | |
306 def getAddress(self, logicalURL): | |
307 return defer.fail(LookupError()) | |
308 | |
309 | |
310 class ProxyTestCase(unittest.TestCase): | |
311 | |
312 def setUp(self): | |
313 self.proxy = sip.Proxy("127.0.0.1") | |
314 self.proxy.locator = DummyLocator() | |
315 self.sent = [] | |
316 self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg)) | |
317 | |
318 def testRequestForward(self): | |
319 r = sip.Request("INVITE", "sip:foo") | |
320 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
321 r.addHeader("via", sip.Via("1.2.3.5").toString()) | |
322 r.addHeader("foo", "bar") | |
323 r.addHeader("to", "<sip:joe@server.com>") | |
324 r.addHeader("contact", "<sip:joe@1.2.3.5>") | |
325 self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060)) | |
326 self.assertEquals(len(self.sent), 1) | |
327 dest, m = self.sent[0] | |
328 self.assertEquals(dest.port, 5060) | |
329 self.assertEquals(dest.host, "server.com") | |
330 self.assertEquals(m.uri.toString(), "sip:foo") | |
331 self.assertEquals(m.method, "INVITE") | |
332 self.assertEquals(m.headers["via"], | |
333 ["SIP/2.0/UDP 127.0.0.1:5060", | |
334 "SIP/2.0/UDP 1.2.3.4:5060", | |
335 "SIP/2.0/UDP 1.2.3.5:5060"]) | |
336 | |
337 | |
338 def testReceivedRequestForward(self): | |
339 r = sip.Request("INVITE", "sip:foo") | |
340 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
341 r.addHeader("foo", "bar") | |
342 r.addHeader("to", "<sip:joe@server.com>") | |
343 r.addHeader("contact", "<sip:joe@1.2.3.4>") | |
344 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
345 dest, m = self.sent[0] | |
346 self.assertEquals(m.headers["via"], | |
347 ["SIP/2.0/UDP 127.0.0.1:5060", | |
348 "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"]) | |
349 | |
350 | |
351 def testResponseWrongVia(self): | |
352 # first via must match proxy's address | |
353 r = sip.Response(200) | |
354 r.addHeader("via", sip.Via("foo.com").toString()) | |
355 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
356 self.assertEquals(len(self.sent), 0) | |
357 | |
358 def testResponseForward(self): | |
359 r = sip.Response(200) | |
360 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
361 r.addHeader("via", sip.Via("client.com", port=1234).toString()) | |
362 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
363 self.assertEquals(len(self.sent), 1) | |
364 dest, m = self.sent[0] | |
365 self.assertEquals((dest.host, dest.port), ("client.com", 1234)) | |
366 self.assertEquals(m.code, 200) | |
367 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:1234"]) | |
368 | |
369 def testReceivedResponseForward(self): | |
370 r = sip.Response(200) | |
371 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
372 r.addHeader("via", sip.Via("10.0.0.1", received="client.com").toString()
) | |
373 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
374 self.assertEquals(len(self.sent), 1) | |
375 dest, m = self.sent[0] | |
376 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
377 | |
378 def testResponseToUs(self): | |
379 r = sip.Response(200) | |
380 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
381 l = [] | |
382 self.proxy.gotResponse = lambda *a: l.append(a) | |
383 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
384 self.assertEquals(len(l), 1) | |
385 m, addr = l[0] | |
386 self.assertEquals(len(m.headers.get("via", [])), 0) | |
387 self.assertEquals(m.code, 200) | |
388 | |
389 def testLoop(self): | |
390 r = sip.Request("INVITE", "sip:foo") | |
391 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
392 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
393 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
394 self.assertEquals(self.sent, []) | |
395 | |
396 def testCantForwardRequest(self): | |
397 r = sip.Request("INVITE", "sip:foo") | |
398 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
399 r.addHeader("to", "<sip:joe@server.com>") | |
400 self.proxy.locator = FailingLocator() | |
401 self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060)) | |
402 self.assertEquals(len(self.sent), 1) | |
403 dest, m = self.sent[0] | |
404 self.assertEquals((dest.host, dest.port), ("1.2.3.4", 5060)) | |
405 self.assertEquals(m.code, 404) | |
406 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"]) | |
407 | |
408 def testCantForwardResponse(self): | |
409 pass | |
410 | |
411 #testCantForwardResponse.skip = "not implemented yet" | |
412 | |
413 | |
414 class RegistrationTestCase(unittest.TestCase): | |
415 | |
416 def setUp(self): | |
417 self.proxy = sip.RegisterProxy(host="127.0.0.1") | |
418 self.registry = sip.InMemoryRegistry("bell.example.com") | |
419 self.proxy.registry = self.proxy.locator = self.registry | |
420 self.sent = [] | |
421 self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg)) | |
422 | |
423 def tearDown(self): | |
424 for d, uri in self.registry.users.values(): | |
425 d.cancel() | |
426 del self.proxy | |
427 | |
428 def register(self): | |
429 r = sip.Request("REGISTER", "sip:bell.example.com") | |
430 r.addHeader("to", "sip:joe@bell.example.com") | |
431 r.addHeader("contact", "sip:joe@client.com:1234") | |
432 r.addHeader("via", sip.Via("client.com").toString()) | |
433 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
434 | |
435 def unregister(self): | |
436 r = sip.Request("REGISTER", "sip:bell.example.com") | |
437 r.addHeader("to", "sip:joe@bell.example.com") | |
438 r.addHeader("contact", "*") | |
439 r.addHeader("via", sip.Via("client.com").toString()) | |
440 r.addHeader("expires", "0") | |
441 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
442 | |
443 def testRegister(self): | |
444 self.register() | |
445 dest, m = self.sent[0] | |
446 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
447 self.assertEquals(m.code, 200) | |
448 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"]) | |
449 self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"]) | |
450 self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"]) | |
451 self.failUnless(int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598)
) | |
452 self.assertEquals(len(self.registry.users), 1) | |
453 dc, uri = self.registry.users["joe"] | |
454 self.assertEquals(uri.toString(), "sip:joe@client.com:5060") | |
455 d = self.proxy.locator.getAddress(sip.URL(username="joe", | |
456 host="bell.example.com")) | |
457 d.addCallback(lambda desturl : (desturl.host, desturl.port)) | |
458 d.addCallback(self.assertEquals, ('client.com', 5060)) | |
459 return d | |
460 | |
461 def testUnregister(self): | |
462 self.register() | |
463 self.unregister() | |
464 dest, m = self.sent[1] | |
465 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
466 self.assertEquals(m.code, 200) | |
467 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"]) | |
468 self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"]) | |
469 self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"]) | |
470 self.assertEquals(m.headers["expires"], ["0"]) | |
471 self.assertEquals(self.registry.users, {}) | |
472 | |
473 | |
474 def addPortal(self): | |
475 r = TestRealm() | |
476 p = cred.portal.Portal(r) | |
477 c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() | |
478 c.addUser('userXname@127.0.0.1', 'passXword') | |
479 p.registerChecker(c) | |
480 self.proxy.portal = p | |
481 | |
482 def testFailedAuthentication(self): | |
483 self.addPortal() | |
484 self.register() | |
485 | |
486 self.assertEquals(len(self.registry.users), 0) | |
487 self.assertEquals(len(self.sent), 1) | |
488 dest, m = self.sent[0] | |
489 self.assertEquals(m.code, 401) | |
490 | |
491 | |
492 def testBasicAuthentication(self): | |
493 self.addPortal() | |
494 self.proxy.authorizers = self.proxy.authorizers.copy() | |
495 self.proxy.authorizers['basic'] = sip.BasicAuthorizer() | |
496 | |
497 r = sip.Request("REGISTER", "sip:bell.example.com") | |
498 r.addHeader("to", "sip:joe@bell.example.com") | |
499 r.addHeader("contact", "sip:joe@client.com:1234") | |
500 r.addHeader("via", sip.Via("client.com").toString()) | |
501 r.addHeader("authorization", "Basic " + "userXname:passXword".encode('ba
se64')) | |
502 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
503 | |
504 self.assertEquals(len(self.registry.users), 1) | |
505 self.assertEquals(len(self.sent), 1) | |
506 dest, m = self.sent[0] | |
507 self.assertEquals(m.code, 200) | |
508 | |
509 | |
510 def testFailedBasicAuthentication(self): | |
511 self.addPortal() | |
512 self.proxy.authorizers = self.proxy.authorizers.copy() | |
513 self.proxy.authorizers['basic'] = sip.BasicAuthorizer() | |
514 | |
515 r = sip.Request("REGISTER", "sip:bell.example.com") | |
516 r.addHeader("to", "sip:joe@bell.example.com") | |
517 r.addHeader("contact", "sip:joe@client.com:1234") | |
518 r.addHeader("via", sip.Via("client.com").toString()) | |
519 r.addHeader("authorization", "Basic " + "userXname:password".encode('bas
e64')) | |
520 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
521 | |
522 self.assertEquals(len(self.registry.users), 0) | |
523 self.assertEquals(len(self.sent), 1) | |
524 dest, m = self.sent[0] | |
525 self.assertEquals(m.code, 401) | |
526 | |
527 def testWrongDomainRegister(self): | |
528 r = sip.Request("REGISTER", "sip:wrong.com") | |
529 r.addHeader("to", "sip:joe@bell.example.com") | |
530 r.addHeader("contact", "sip:joe@client.com:1234") | |
531 r.addHeader("via", sip.Via("client.com").toString()) | |
532 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
533 self.assertEquals(len(self.sent), 0) | |
534 | |
535 def testWrongToDomainRegister(self): | |
536 r = sip.Request("REGISTER", "sip:bell.example.com") | |
537 r.addHeader("to", "sip:joe@foo.com") | |
538 r.addHeader("contact", "sip:joe@client.com:1234") | |
539 r.addHeader("via", sip.Via("client.com").toString()) | |
540 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
541 self.assertEquals(len(self.sent), 0) | |
542 | |
543 def testWrongDomainLookup(self): | |
544 self.register() | |
545 url = sip.URL(username="joe", host="foo.com") | |
546 d = self.proxy.locator.getAddress(url) | |
547 self.assertFailure(d, LookupError) | |
548 return d | |
549 | |
550 def testNoContactLookup(self): | |
551 self.register() | |
552 url = sip.URL(username="jane", host="bell.example.com") | |
553 d = self.proxy.locator.getAddress(url) | |
554 self.assertFailure(d, LookupError) | |
555 return d | |
556 | |
557 | |
558 class Client(sip.Base): | |
559 | |
560 def __init__(self): | |
561 sip.Base.__init__(self) | |
562 self.received = [] | |
563 self.deferred = defer.Deferred() | |
564 | |
565 def handle_response(self, response, addr): | |
566 self.received.append(response) | |
567 self.deferred.callback(self.received) | |
568 | |
569 | |
570 class LiveTest(unittest.TestCase): | |
571 | |
572 def setUp(self): | |
573 self.proxy = sip.RegisterProxy(host="127.0.0.1") | |
574 self.registry = sip.InMemoryRegistry("bell.example.com") | |
575 self.proxy.registry = self.proxy.locator = self.registry | |
576 self.serverPort = reactor.listenUDP(0, self.proxy, interface="127.0.0.1"
) | |
577 self.client = Client() | |
578 self.clientPort = reactor.listenUDP(0, self.client, interface="127.0.0.1
") | |
579 self.serverAddress = (self.serverPort.getHost().host, | |
580 self.serverPort.getHost().port) | |
581 | |
582 def tearDown(self): | |
583 for d, uri in self.registry.users.values(): | |
584 d.cancel() | |
585 d1 = defer.maybeDeferred(self.clientPort.stopListening) | |
586 d2 = defer.maybeDeferred(self.serverPort.stopListening) | |
587 return defer.gatherResults([d1, d2]) | |
588 | |
589 def testRegister(self): | |
590 p = self.clientPort.getHost().port | |
591 r = sip.Request("REGISTER", "sip:bell.example.com") | |
592 r.addHeader("to", "sip:joe@bell.example.com") | |
593 r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p) | |
594 r.addHeader("via", sip.Via("127.0.0.1", port=p).toString()) | |
595 self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddres
s[1]), | |
596 r) | |
597 d = self.client.deferred | |
598 def check(received): | |
599 self.assertEquals(len(received), 1) | |
600 r = received[0] | |
601 self.assertEquals(r.code, 200) | |
602 d.addCallback(check) | |
603 return d | |
604 | |
605 def testAmoralRPort(self): | |
606 # rport is allowed without a value, apparently because server | |
607 # implementors might be too stupid to check the received port | |
608 # against 5060 and see if they're equal, and because client | |
609 # implementors might be too stupid to bind to port 5060, or set a | |
610 # value on the rport parameter they send if they bind to another | |
611 # port. | |
612 p = self.clientPort.getHost().port | |
613 r = sip.Request("REGISTER", "sip:bell.example.com") | |
614 r.addHeader("to", "sip:joe@bell.example.com") | |
615 r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p) | |
616 r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString()) | |
617 self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddres
s[1]), | |
618 r) | |
619 d = self.client.deferred | |
620 def check(received): | |
621 self.assertEquals(len(received), 1) | |
622 r = received[0] | |
623 self.assertEquals(r.code, 200) | |
624 d.addCallback(check) | |
625 return d | |
626 | |
627 | |
628 registerRequest = """ | |
629 REGISTER sip:intarweb.us SIP/2.0\r | |
630 Via: SIP/2.0/UDP 192.168.1.100:50609\r | |
631 From: <sip:exarkun@intarweb.us:50609>\r | |
632 To: <sip:exarkun@intarweb.us:50609>\r | |
633 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r | |
634 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
635 CSeq: 9898 REGISTER\r | |
636 Expires: 500\r | |
637 User-Agent: X-Lite build 1061\r | |
638 Content-Length: 0\r | |
639 \r | |
640 """ | |
641 | |
642 challengeResponse = """\ | |
643 SIP/2.0 401 Unauthorized\r | |
644 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r | |
645 To: <sip:exarkun@intarweb.us:50609>\r | |
646 From: <sip:exarkun@intarweb.us:50609>\r | |
647 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
648 CSeq: 9898 REGISTER\r | |
649 WWW-Authenticate: Digest nonce="92956076410767313901322208775",opaque="167418642
8",qop-options="auth",algorithm="MD5",realm="intarweb.us"\r | |
650 \r | |
651 """ | |
652 | |
653 authRequest = """\ | |
654 REGISTER sip:intarweb.us SIP/2.0\r | |
655 Via: SIP/2.0/UDP 192.168.1.100:50609\r | |
656 From: <sip:exarkun@intarweb.us:50609>\r | |
657 To: <sip:exarkun@intarweb.us:50609>\r | |
658 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r | |
659 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
660 CSeq: 9899 REGISTER\r | |
661 Expires: 500\r | |
662 Authorization: Digest username="exarkun",realm="intarweb.us",nonce="929560764107
67313901322208775",response="4a47980eea31694f997369214292374b",uri="sip:intarweb
.us",algorithm=MD5,opaque="1674186428"\r | |
663 User-Agent: X-Lite build 1061\r | |
664 Content-Length: 0\r | |
665 \r | |
666 """ | |
667 | |
668 okResponse = """\ | |
669 SIP/2.0 200 OK\r | |
670 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r | |
671 To: <sip:exarkun@intarweb.us:50609>\r | |
672 From: <sip:exarkun@intarweb.us:50609>\r | |
673 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
674 CSeq: 9899 REGISTER\r | |
675 Contact: sip:exarkun@127.0.0.1:5632\r | |
676 Expires: 3600\r | |
677 Content-Length: 0\r | |
678 \r | |
679 """ | |
680 | |
681 class FakeDigestAuthorizer(sip.DigestAuthorizer): | |
682 def generateNonce(self): | |
683 return '92956076410767313901322208775' | |
684 def generateOpaque(self): | |
685 return '1674186428' | |
686 | |
687 | |
688 class FakeRegistry(sip.InMemoryRegistry): | |
689 """Make sure expiration is always seen to be 3600. | |
690 | |
691 Otherwise slow reactors fail tests incorrectly. | |
692 """ | |
693 | |
694 def _cbReg(self, reg): | |
695 if 3600 < reg.secondsToExpiry or reg.secondsToExpiry < 3598: | |
696 raise RuntimeError, "bad seconds to expire: %s" % reg.secondsToExpir
y | |
697 reg.secondsToExpiry = 3600 | |
698 return reg | |
699 | |
700 def getRegistrationInfo(self, uri): | |
701 return sip.InMemoryRegistry.getRegistrationInfo(self, uri).addCallback(s
elf._cbReg) | |
702 | |
703 def registerAddress(self, domainURL, logicalURL, physicalURL): | |
704 return sip.InMemoryRegistry.registerAddress(self, domainURL, logicalURL,
physicalURL).addCallback(self._cbReg) | |
705 | |
706 class AuthorizationTestCase(unittest.TestCase): | |
707 def setUp(self): | |
708 self.proxy = sip.RegisterProxy(host="intarweb.us") | |
709 self.proxy.authorizers = self.proxy.authorizers.copy() | |
710 self.proxy.authorizers['digest'] = FakeDigestAuthorizer() | |
711 self.registry = FakeRegistry("intarweb.us") | |
712 self.proxy.registry = self.proxy.locator = self.registry | |
713 self.transport = proto_helpers.FakeDatagramTransport() | |
714 self.proxy.transport = self.transport | |
715 | |
716 r = TestRealm() | |
717 p = cred.portal.Portal(r) | |
718 c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() | |
719 c.addUser('exarkun@intarweb.us', 'password') | |
720 p.registerChecker(c) | |
721 self.proxy.portal = p | |
722 | |
723 def tearDown(self): | |
724 for d, uri in self.registry.users.values(): | |
725 d.cancel() | |
726 del self.proxy | |
727 | |
728 def testChallenge(self): | |
729 self.proxy.datagramReceived(registerRequest, ("127.0.0.1", 5632)) | |
730 self.assertEquals( | |
731 self.transport.written[-1], | |
732 ((challengeResponse, ("127.0.0.1", 5632))) | |
733 ) | |
734 self.transport.written = [] | |
735 | |
736 self.proxy.datagramReceived(authRequest, ("127.0.0.1", 5632)) | |
737 self.assertEquals( | |
738 self.transport.written[-1], | |
739 ((okResponse, ("127.0.0.1", 5632))) | |
740 ) | |
OLD | NEW |