| OLD | NEW |
| (Empty) |
| 1 # -*- test-case-name: twisted.test.test_sip -*- | |
| 2 # Copyright (c) 2001-2004 Twisted Matrix Laboratories. | |
| 3 # See LICENSE for details. | |
| 4 | |
| 5 | |
| 6 """Session Initialization Protocol tests.""" | |
| 7 | |
| 8 from twisted.trial import unittest | |
| 9 from twisted.protocols import sip | |
| 10 from twisted.internet import defer, reactor | |
| 11 | |
| 12 from twisted.test import proto_helpers | |
| 13 | |
| 14 from twisted import cred | |
| 15 import twisted.cred.portal | |
| 16 import twisted.cred.checkers | |
| 17 | |
| 18 from zope.interface import implements | |
| 19 | |
| 20 | |
| 21 # request, prefixed by random CRLFs | |
| 22 request1 = "\n\r\n\n\r" + """\ | |
| 23 INVITE sip:foo SIP/2.0 | |
| 24 From: mo | |
| 25 To: joe | |
| 26 Content-Length: 4 | |
| 27 | |
| 28 abcd""".replace("\n", "\r\n") | |
| 29 | |
| 30 # request, no content-length | |
| 31 request2 = """INVITE sip:foo SIP/2.0 | |
| 32 From: mo | |
| 33 To: joe | |
| 34 | |
| 35 1234""".replace("\n", "\r\n") | |
| 36 | |
| 37 # request, with garbage after | |
| 38 request3 = """INVITE sip:foo SIP/2.0 | |
| 39 From: mo | |
| 40 To: joe | |
| 41 Content-Length: 4 | |
| 42 | |
| 43 1234 | |
| 44 | |
| 45 lalalal""".replace("\n", "\r\n") | |
| 46 | |
| 47 # three requests | |
| 48 request4 = """INVITE sip:foo SIP/2.0 | |
| 49 From: mo | |
| 50 To: joe | |
| 51 Content-Length: 0 | |
| 52 | |
| 53 INVITE sip:loop SIP/2.0 | |
| 54 From: foo | |
| 55 To: bar | |
| 56 Content-Length: 4 | |
| 57 | |
| 58 abcdINVITE sip:loop SIP/2.0 | |
| 59 From: foo | |
| 60 To: bar | |
| 61 Content-Length: 4 | |
| 62 | |
| 63 1234""".replace("\n", "\r\n") | |
| 64 | |
| 65 # response, no content | |
| 66 response1 = """SIP/2.0 200 OK | |
| 67 From: foo | |
| 68 To:bar | |
| 69 Content-Length: 0 | |
| 70 | |
| 71 """.replace("\n", "\r\n") | |
| 72 | |
| 73 # short header version | |
| 74 request_short = """\ | |
| 75 INVITE sip:foo SIP/2.0 | |
| 76 f: mo | |
| 77 t: joe | |
| 78 l: 4 | |
| 79 | |
| 80 abcd""".replace("\n", "\r\n") | |
| 81 | |
| 82 request_natted = """\ | |
| 83 INVITE sip:foo SIP/2.0 | |
| 84 Via: SIP/2.0/UDP 10.0.0.1:5060;rport | |
| 85 | |
| 86 """.replace("\n", "\r\n") | |
| 87 | |
| 88 class TestRealm: | |
| 89 def requestAvatar(self, avatarId, mind, *interfaces): | |
| 90 return sip.IContact, None, lambda: None | |
| 91 | |
| 92 class MessageParsingTestCase(unittest.TestCase): | |
| 93 def setUp(self): | |
| 94 self.l = [] | |
| 95 self.parser = sip.MessagesParser(self.l.append) | |
| 96 | |
| 97 def feedMessage(self, message): | |
| 98 self.parser.dataReceived(message) | |
| 99 self.parser.dataDone() | |
| 100 | |
| 101 def validateMessage(self, m, method, uri, headers, body): | |
| 102 """Validate Requests.""" | |
| 103 self.assertEquals(m.method, method) | |
| 104 self.assertEquals(m.uri.toString(), uri) | |
| 105 self.assertEquals(m.headers, headers) | |
| 106 self.assertEquals(m.body, body) | |
| 107 self.assertEquals(m.finished, 1) | |
| 108 | |
| 109 def testSimple(self): | |
| 110 l = self.l | |
| 111 self.feedMessage(request1) | |
| 112 self.assertEquals(len(l), 1) | |
| 113 self.validateMessage(l[0], "INVITE", "sip:foo", | |
| 114 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
| 115 "abcd") | |
| 116 | |
| 117 def testTwoMessages(self): | |
| 118 l = self.l | |
| 119 self.feedMessage(request1) | |
| 120 self.feedMessage(request2) | |
| 121 self.assertEquals(len(l), 2) | |
| 122 self.validateMessage(l[0], "INVITE", "sip:foo", | |
| 123 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
| 124 "abcd") | |
| 125 self.validateMessage(l[1], "INVITE", "sip:foo", | |
| 126 {"from": ["mo"], "to": ["joe"]}, | |
| 127 "1234") | |
| 128 | |
| 129 def testGarbage(self): | |
| 130 l = self.l | |
| 131 self.feedMessage(request3) | |
| 132 self.assertEquals(len(l), 1) | |
| 133 self.validateMessage(l[0], "INVITE", "sip:foo", | |
| 134 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
| 135 "1234") | |
| 136 | |
| 137 def testThreeInOne(self): | |
| 138 l = self.l | |
| 139 self.feedMessage(request4) | |
| 140 self.assertEquals(len(l), 3) | |
| 141 self.validateMessage(l[0], "INVITE", "sip:foo", | |
| 142 {"from": ["mo"], "to": ["joe"], "content-length": [
"0"]}, | |
| 143 "") | |
| 144 self.validateMessage(l[1], "INVITE", "sip:loop", | |
| 145 {"from": ["foo"], "to": ["bar"], "content-length":
["4"]}, | |
| 146 "abcd") | |
| 147 self.validateMessage(l[2], "INVITE", "sip:loop", | |
| 148 {"from": ["foo"], "to": ["bar"], "content-length":
["4"]}, | |
| 149 "1234") | |
| 150 | |
| 151 def testShort(self): | |
| 152 l = self.l | |
| 153 self.feedMessage(request_short) | |
| 154 self.assertEquals(len(l), 1) | |
| 155 self.validateMessage(l[0], "INVITE", "sip:foo", | |
| 156 {"from": ["mo"], "to": ["joe"], "content-length": [
"4"]}, | |
| 157 "abcd") | |
| 158 | |
| 159 def testSimpleResponse(self): | |
| 160 l = self.l | |
| 161 self.feedMessage(response1) | |
| 162 self.assertEquals(len(l), 1) | |
| 163 m = l[0] | |
| 164 self.assertEquals(m.code, 200) | |
| 165 self.assertEquals(m.phrase, "OK") | |
| 166 self.assertEquals(m.headers, {"from": ["foo"], "to": ["bar"], "content-l
ength": ["0"]}) | |
| 167 self.assertEquals(m.body, "") | |
| 168 self.assertEquals(m.finished, 1) | |
| 169 | |
| 170 | |
| 171 class MessageParsingTestCase2(MessageParsingTestCase): | |
| 172 """Same as base class, but feed data char by char.""" | |
| 173 | |
| 174 def feedMessage(self, message): | |
| 175 for c in message: | |
| 176 self.parser.dataReceived(c) | |
| 177 self.parser.dataDone() | |
| 178 | |
| 179 | |
| 180 class MakeMessageTestCase(unittest.TestCase): | |
| 181 | |
| 182 def testRequest(self): | |
| 183 r = sip.Request("INVITE", "sip:foo") | |
| 184 r.addHeader("foo", "bar") | |
| 185 self.assertEquals(r.toString(), "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\
r\n") | |
| 186 | |
| 187 def testResponse(self): | |
| 188 r = sip.Response(200, "OK") | |
| 189 r.addHeader("foo", "bar") | |
| 190 r.addHeader("Content-Length", "4") | |
| 191 r.bodyDataReceived("1234") | |
| 192 self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-L
ength: 4\r\n\r\n1234") | |
| 193 | |
| 194 def testStatusCode(self): | |
| 195 r = sip.Response(200) | |
| 196 self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\n\r\n") | |
| 197 | |
| 198 | |
| 199 class ViaTestCase(unittest.TestCase): | |
| 200 | |
| 201 def checkRoundtrip(self, v): | |
| 202 s = v.toString() | |
| 203 self.assertEquals(s, sip.parseViaHeader(s).toString()) | |
| 204 | |
| 205 def testExtraWhitespace(self): | |
| 206 v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060') | |
| 207 v2 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060') | |
| 208 self.assertEquals(v1.transport, v2.transport) | |
| 209 self.assertEquals(v1.host, v2.host) | |
| 210 self.assertEquals(v1.port, v2.port) | |
| 211 | |
| 212 def testComplex(self): | |
| 213 s = "SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1 ;branch=a
7c6a8dlze (Example)" | |
| 214 v = sip.parseViaHeader(s) | |
| 215 self.assertEquals(v.transport, "UDP") | |
| 216 self.assertEquals(v.host, "first.example.com") | |
| 217 self.assertEquals(v.port, 4000) | |
| 218 self.assertEquals(v.ttl, 16) | |
| 219 self.assertEquals(v.maddr, "224.2.0.1") | |
| 220 self.assertEquals(v.branch, "a7c6a8dlze") | |
| 221 self.assertEquals(v.hidden, 0) | |
| 222 self.assertEquals(v.toString(), | |
| 223 "SIP/2.0/UDP first.example.com:4000;ttl=16;branch=a7c6
a8dlze;maddr=224.2.0.1") | |
| 224 self.checkRoundtrip(v) | |
| 225 | |
| 226 def testSimple(self): | |
| 227 s = "SIP/2.0/UDP example.com;hidden" | |
| 228 v = sip.parseViaHeader(s) | |
| 229 self.assertEquals(v.transport, "UDP") | |
| 230 self.assertEquals(v.host, "example.com") | |
| 231 self.assertEquals(v.port, 5060) | |
| 232 self.assertEquals(v.ttl, None) | |
| 233 self.assertEquals(v.maddr, None) | |
| 234 self.assertEquals(v.branch, None) | |
| 235 self.assertEquals(v.hidden, 1) | |
| 236 self.assertEquals(v.toString(), | |
| 237 "SIP/2.0/UDP example.com:5060;hidden") | |
| 238 self.checkRoundtrip(v) | |
| 239 | |
| 240 def testSimpler(self): | |
| 241 v = sip.Via("example.com") | |
| 242 self.checkRoundtrip(v) | |
| 243 | |
| 244 def testRPort(self): | |
| 245 v = sip.Via("foo.bar", rport=True) | |
| 246 self.assertEquals(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport") | |
| 247 | |
| 248 def testNAT(self): | |
| 249 s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345" | |
| 250 v = sip.parseViaHeader(s) | |
| 251 self.assertEquals(v.transport, "UDP") | |
| 252 self.assertEquals(v.host, "10.0.0.1") | |
| 253 self.assertEquals(v.port, 5060) | |
| 254 self.assertEquals(v.received, "22.13.1.5") | |
| 255 self.assertEquals(v.rport, 12345) | |
| 256 | |
| 257 self.assertNotEquals(v.toString().find("rport=12345"), -1) | |
| 258 | |
| 259 class URLTestCase(unittest.TestCase): | |
| 260 | |
| 261 def testRoundtrip(self): | |
| 262 for url in [ | |
| 263 "sip:j.doe@big.com", | |
| 264 "sip:j.doe:secret@big.com;transport=tcp", | |
| 265 "sip:j.doe@big.com?subject=project", | |
| 266 "sip:example.com", | |
| 267 ]: | |
| 268 self.assertEquals(sip.parseURL(url).toString(), url) | |
| 269 | |
| 270 def testComplex(self): | |
| 271 s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;" | |
| 272 "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d") | |
| 273 url = sip.parseURL(s) | |
| 274 for k, v in [("username", "user"), ("password", "pass"), | |
| 275 ("host", "hosta"), ("port", 123), | |
| 276 ("transport", "udp"), ("usertype", "phone"), | |
| 277 ("method", "foo"), ("ttl", 12), | |
| 278 ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]), | |
| 279 ("headers", {"a": "b", "c": "d"})]: | |
| 280 self.assertEquals(getattr(url, k), v) | |
| 281 | |
| 282 | |
| 283 class ParseTestCase(unittest.TestCase): | |
| 284 | |
| 285 def testParseAddress(self): | |
| 286 for address, name, urls, params in [ | |
| 287 ('"A. G. Bell" <sip:foo@example.com>', "A. G. Bell", "sip:foo@exampl
e.com", {}), | |
| 288 ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}), | |
| 289 ("sip:foo@example.com", "", "sip:foo@example.com", {}), | |
| 290 ("<sip:foo@example.com>", "", "sip:foo@example.com", {}), | |
| 291 ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo", "sip:foo@exampl
e.com", {"tag": "bar", "foo": "baz"}), | |
| 292 ]: | |
| 293 gname, gurl, gparams = sip.parseAddress(address) | |
| 294 self.assertEquals(name, gname) | |
| 295 self.assertEquals(gurl.toString(), urls) | |
| 296 self.assertEquals(gparams, params) | |
| 297 | |
| 298 | |
| 299 class DummyLocator: | |
| 300 implements(sip.ILocator) | |
| 301 def getAddress(self, logicalURL): | |
| 302 return defer.succeed(sip.URL("server.com", port=5060)) | |
| 303 | |
| 304 class FailingLocator: | |
| 305 implements(sip.ILocator) | |
| 306 def getAddress(self, logicalURL): | |
| 307 return defer.fail(LookupError()) | |
| 308 | |
| 309 | |
| 310 class ProxyTestCase(unittest.TestCase): | |
| 311 | |
| 312 def setUp(self): | |
| 313 self.proxy = sip.Proxy("127.0.0.1") | |
| 314 self.proxy.locator = DummyLocator() | |
| 315 self.sent = [] | |
| 316 self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg)) | |
| 317 | |
| 318 def testRequestForward(self): | |
| 319 r = sip.Request("INVITE", "sip:foo") | |
| 320 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
| 321 r.addHeader("via", sip.Via("1.2.3.5").toString()) | |
| 322 r.addHeader("foo", "bar") | |
| 323 r.addHeader("to", "<sip:joe@server.com>") | |
| 324 r.addHeader("contact", "<sip:joe@1.2.3.5>") | |
| 325 self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060)) | |
| 326 self.assertEquals(len(self.sent), 1) | |
| 327 dest, m = self.sent[0] | |
| 328 self.assertEquals(dest.port, 5060) | |
| 329 self.assertEquals(dest.host, "server.com") | |
| 330 self.assertEquals(m.uri.toString(), "sip:foo") | |
| 331 self.assertEquals(m.method, "INVITE") | |
| 332 self.assertEquals(m.headers["via"], | |
| 333 ["SIP/2.0/UDP 127.0.0.1:5060", | |
| 334 "SIP/2.0/UDP 1.2.3.4:5060", | |
| 335 "SIP/2.0/UDP 1.2.3.5:5060"]) | |
| 336 | |
| 337 | |
| 338 def testReceivedRequestForward(self): | |
| 339 r = sip.Request("INVITE", "sip:foo") | |
| 340 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
| 341 r.addHeader("foo", "bar") | |
| 342 r.addHeader("to", "<sip:joe@server.com>") | |
| 343 r.addHeader("contact", "<sip:joe@1.2.3.4>") | |
| 344 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
| 345 dest, m = self.sent[0] | |
| 346 self.assertEquals(m.headers["via"], | |
| 347 ["SIP/2.0/UDP 127.0.0.1:5060", | |
| 348 "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"]) | |
| 349 | |
| 350 | |
| 351 def testResponseWrongVia(self): | |
| 352 # first via must match proxy's address | |
| 353 r = sip.Response(200) | |
| 354 r.addHeader("via", sip.Via("foo.com").toString()) | |
| 355 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
| 356 self.assertEquals(len(self.sent), 0) | |
| 357 | |
| 358 def testResponseForward(self): | |
| 359 r = sip.Response(200) | |
| 360 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
| 361 r.addHeader("via", sip.Via("client.com", port=1234).toString()) | |
| 362 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
| 363 self.assertEquals(len(self.sent), 1) | |
| 364 dest, m = self.sent[0] | |
| 365 self.assertEquals((dest.host, dest.port), ("client.com", 1234)) | |
| 366 self.assertEquals(m.code, 200) | |
| 367 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:1234"]) | |
| 368 | |
| 369 def testReceivedResponseForward(self): | |
| 370 r = sip.Response(200) | |
| 371 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
| 372 r.addHeader("via", sip.Via("10.0.0.1", received="client.com").toString()
) | |
| 373 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
| 374 self.assertEquals(len(self.sent), 1) | |
| 375 dest, m = self.sent[0] | |
| 376 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
| 377 | |
| 378 def testResponseToUs(self): | |
| 379 r = sip.Response(200) | |
| 380 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
| 381 l = [] | |
| 382 self.proxy.gotResponse = lambda *a: l.append(a) | |
| 383 self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060)) | |
| 384 self.assertEquals(len(l), 1) | |
| 385 m, addr = l[0] | |
| 386 self.assertEquals(len(m.headers.get("via", [])), 0) | |
| 387 self.assertEquals(m.code, 200) | |
| 388 | |
| 389 def testLoop(self): | |
| 390 r = sip.Request("INVITE", "sip:foo") | |
| 391 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
| 392 r.addHeader("via", sip.Via("127.0.0.1").toString()) | |
| 393 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 394 self.assertEquals(self.sent, []) | |
| 395 | |
| 396 def testCantForwardRequest(self): | |
| 397 r = sip.Request("INVITE", "sip:foo") | |
| 398 r.addHeader("via", sip.Via("1.2.3.4").toString()) | |
| 399 r.addHeader("to", "<sip:joe@server.com>") | |
| 400 self.proxy.locator = FailingLocator() | |
| 401 self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060)) | |
| 402 self.assertEquals(len(self.sent), 1) | |
| 403 dest, m = self.sent[0] | |
| 404 self.assertEquals((dest.host, dest.port), ("1.2.3.4", 5060)) | |
| 405 self.assertEquals(m.code, 404) | |
| 406 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"]) | |
| 407 | |
| 408 def testCantForwardResponse(self): | |
| 409 pass | |
| 410 | |
| 411 #testCantForwardResponse.skip = "not implemented yet" | |
| 412 | |
| 413 | |
| 414 class RegistrationTestCase(unittest.TestCase): | |
| 415 | |
| 416 def setUp(self): | |
| 417 self.proxy = sip.RegisterProxy(host="127.0.0.1") | |
| 418 self.registry = sip.InMemoryRegistry("bell.example.com") | |
| 419 self.proxy.registry = self.proxy.locator = self.registry | |
| 420 self.sent = [] | |
| 421 self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg)) | |
| 422 | |
| 423 def tearDown(self): | |
| 424 for d, uri in self.registry.users.values(): | |
| 425 d.cancel() | |
| 426 del self.proxy | |
| 427 | |
| 428 def register(self): | |
| 429 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 430 r.addHeader("to", "sip:joe@bell.example.com") | |
| 431 r.addHeader("contact", "sip:joe@client.com:1234") | |
| 432 r.addHeader("via", sip.Via("client.com").toString()) | |
| 433 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 434 | |
| 435 def unregister(self): | |
| 436 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 437 r.addHeader("to", "sip:joe@bell.example.com") | |
| 438 r.addHeader("contact", "*") | |
| 439 r.addHeader("via", sip.Via("client.com").toString()) | |
| 440 r.addHeader("expires", "0") | |
| 441 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 442 | |
| 443 def testRegister(self): | |
| 444 self.register() | |
| 445 dest, m = self.sent[0] | |
| 446 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
| 447 self.assertEquals(m.code, 200) | |
| 448 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"]) | |
| 449 self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"]) | |
| 450 self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"]) | |
| 451 self.failUnless(int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598)
) | |
| 452 self.assertEquals(len(self.registry.users), 1) | |
| 453 dc, uri = self.registry.users["joe"] | |
| 454 self.assertEquals(uri.toString(), "sip:joe@client.com:5060") | |
| 455 d = self.proxy.locator.getAddress(sip.URL(username="joe", | |
| 456 host="bell.example.com")) | |
| 457 d.addCallback(lambda desturl : (desturl.host, desturl.port)) | |
| 458 d.addCallback(self.assertEquals, ('client.com', 5060)) | |
| 459 return d | |
| 460 | |
| 461 def testUnregister(self): | |
| 462 self.register() | |
| 463 self.unregister() | |
| 464 dest, m = self.sent[1] | |
| 465 self.assertEquals((dest.host, dest.port), ("client.com", 5060)) | |
| 466 self.assertEquals(m.code, 200) | |
| 467 self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"]) | |
| 468 self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"]) | |
| 469 self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"]) | |
| 470 self.assertEquals(m.headers["expires"], ["0"]) | |
| 471 self.assertEquals(self.registry.users, {}) | |
| 472 | |
| 473 | |
| 474 def addPortal(self): | |
| 475 r = TestRealm() | |
| 476 p = cred.portal.Portal(r) | |
| 477 c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() | |
| 478 c.addUser('userXname@127.0.0.1', 'passXword') | |
| 479 p.registerChecker(c) | |
| 480 self.proxy.portal = p | |
| 481 | |
| 482 def testFailedAuthentication(self): | |
| 483 self.addPortal() | |
| 484 self.register() | |
| 485 | |
| 486 self.assertEquals(len(self.registry.users), 0) | |
| 487 self.assertEquals(len(self.sent), 1) | |
| 488 dest, m = self.sent[0] | |
| 489 self.assertEquals(m.code, 401) | |
| 490 | |
| 491 | |
| 492 def testBasicAuthentication(self): | |
| 493 self.addPortal() | |
| 494 self.proxy.authorizers = self.proxy.authorizers.copy() | |
| 495 self.proxy.authorizers['basic'] = sip.BasicAuthorizer() | |
| 496 | |
| 497 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 498 r.addHeader("to", "sip:joe@bell.example.com") | |
| 499 r.addHeader("contact", "sip:joe@client.com:1234") | |
| 500 r.addHeader("via", sip.Via("client.com").toString()) | |
| 501 r.addHeader("authorization", "Basic " + "userXname:passXword".encode('ba
se64')) | |
| 502 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 503 | |
| 504 self.assertEquals(len(self.registry.users), 1) | |
| 505 self.assertEquals(len(self.sent), 1) | |
| 506 dest, m = self.sent[0] | |
| 507 self.assertEquals(m.code, 200) | |
| 508 | |
| 509 | |
| 510 def testFailedBasicAuthentication(self): | |
| 511 self.addPortal() | |
| 512 self.proxy.authorizers = self.proxy.authorizers.copy() | |
| 513 self.proxy.authorizers['basic'] = sip.BasicAuthorizer() | |
| 514 | |
| 515 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 516 r.addHeader("to", "sip:joe@bell.example.com") | |
| 517 r.addHeader("contact", "sip:joe@client.com:1234") | |
| 518 r.addHeader("via", sip.Via("client.com").toString()) | |
| 519 r.addHeader("authorization", "Basic " + "userXname:password".encode('bas
e64')) | |
| 520 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 521 | |
| 522 self.assertEquals(len(self.registry.users), 0) | |
| 523 self.assertEquals(len(self.sent), 1) | |
| 524 dest, m = self.sent[0] | |
| 525 self.assertEquals(m.code, 401) | |
| 526 | |
| 527 def testWrongDomainRegister(self): | |
| 528 r = sip.Request("REGISTER", "sip:wrong.com") | |
| 529 r.addHeader("to", "sip:joe@bell.example.com") | |
| 530 r.addHeader("contact", "sip:joe@client.com:1234") | |
| 531 r.addHeader("via", sip.Via("client.com").toString()) | |
| 532 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 533 self.assertEquals(len(self.sent), 0) | |
| 534 | |
| 535 def testWrongToDomainRegister(self): | |
| 536 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 537 r.addHeader("to", "sip:joe@foo.com") | |
| 538 r.addHeader("contact", "sip:joe@client.com:1234") | |
| 539 r.addHeader("via", sip.Via("client.com").toString()) | |
| 540 self.proxy.datagramReceived(r.toString(), ("client.com", 5060)) | |
| 541 self.assertEquals(len(self.sent), 0) | |
| 542 | |
| 543 def testWrongDomainLookup(self): | |
| 544 self.register() | |
| 545 url = sip.URL(username="joe", host="foo.com") | |
| 546 d = self.proxy.locator.getAddress(url) | |
| 547 self.assertFailure(d, LookupError) | |
| 548 return d | |
| 549 | |
| 550 def testNoContactLookup(self): | |
| 551 self.register() | |
| 552 url = sip.URL(username="jane", host="bell.example.com") | |
| 553 d = self.proxy.locator.getAddress(url) | |
| 554 self.assertFailure(d, LookupError) | |
| 555 return d | |
| 556 | |
| 557 | |
| 558 class Client(sip.Base): | |
| 559 | |
| 560 def __init__(self): | |
| 561 sip.Base.__init__(self) | |
| 562 self.received = [] | |
| 563 self.deferred = defer.Deferred() | |
| 564 | |
| 565 def handle_response(self, response, addr): | |
| 566 self.received.append(response) | |
| 567 self.deferred.callback(self.received) | |
| 568 | |
| 569 | |
| 570 class LiveTest(unittest.TestCase): | |
| 571 | |
| 572 def setUp(self): | |
| 573 self.proxy = sip.RegisterProxy(host="127.0.0.1") | |
| 574 self.registry = sip.InMemoryRegistry("bell.example.com") | |
| 575 self.proxy.registry = self.proxy.locator = self.registry | |
| 576 self.serverPort = reactor.listenUDP(0, self.proxy, interface="127.0.0.1"
) | |
| 577 self.client = Client() | |
| 578 self.clientPort = reactor.listenUDP(0, self.client, interface="127.0.0.1
") | |
| 579 self.serverAddress = (self.serverPort.getHost().host, | |
| 580 self.serverPort.getHost().port) | |
| 581 | |
| 582 def tearDown(self): | |
| 583 for d, uri in self.registry.users.values(): | |
| 584 d.cancel() | |
| 585 d1 = defer.maybeDeferred(self.clientPort.stopListening) | |
| 586 d2 = defer.maybeDeferred(self.serverPort.stopListening) | |
| 587 return defer.gatherResults([d1, d2]) | |
| 588 | |
| 589 def testRegister(self): | |
| 590 p = self.clientPort.getHost().port | |
| 591 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 592 r.addHeader("to", "sip:joe@bell.example.com") | |
| 593 r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p) | |
| 594 r.addHeader("via", sip.Via("127.0.0.1", port=p).toString()) | |
| 595 self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddres
s[1]), | |
| 596 r) | |
| 597 d = self.client.deferred | |
| 598 def check(received): | |
| 599 self.assertEquals(len(received), 1) | |
| 600 r = received[0] | |
| 601 self.assertEquals(r.code, 200) | |
| 602 d.addCallback(check) | |
| 603 return d | |
| 604 | |
| 605 def testAmoralRPort(self): | |
| 606 # rport is allowed without a value, apparently because server | |
| 607 # implementors might be too stupid to check the received port | |
| 608 # against 5060 and see if they're equal, and because client | |
| 609 # implementors might be too stupid to bind to port 5060, or set a | |
| 610 # value on the rport parameter they send if they bind to another | |
| 611 # port. | |
| 612 p = self.clientPort.getHost().port | |
| 613 r = sip.Request("REGISTER", "sip:bell.example.com") | |
| 614 r.addHeader("to", "sip:joe@bell.example.com") | |
| 615 r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p) | |
| 616 r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString()) | |
| 617 self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddres
s[1]), | |
| 618 r) | |
| 619 d = self.client.deferred | |
| 620 def check(received): | |
| 621 self.assertEquals(len(received), 1) | |
| 622 r = received[0] | |
| 623 self.assertEquals(r.code, 200) | |
| 624 d.addCallback(check) | |
| 625 return d | |
| 626 | |
| 627 | |
| 628 registerRequest = """ | |
| 629 REGISTER sip:intarweb.us SIP/2.0\r | |
| 630 Via: SIP/2.0/UDP 192.168.1.100:50609\r | |
| 631 From: <sip:exarkun@intarweb.us:50609>\r | |
| 632 To: <sip:exarkun@intarweb.us:50609>\r | |
| 633 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r | |
| 634 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
| 635 CSeq: 9898 REGISTER\r | |
| 636 Expires: 500\r | |
| 637 User-Agent: X-Lite build 1061\r | |
| 638 Content-Length: 0\r | |
| 639 \r | |
| 640 """ | |
| 641 | |
| 642 challengeResponse = """\ | |
| 643 SIP/2.0 401 Unauthorized\r | |
| 644 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r | |
| 645 To: <sip:exarkun@intarweb.us:50609>\r | |
| 646 From: <sip:exarkun@intarweb.us:50609>\r | |
| 647 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
| 648 CSeq: 9898 REGISTER\r | |
| 649 WWW-Authenticate: Digest nonce="92956076410767313901322208775",opaque="167418642
8",qop-options="auth",algorithm="MD5",realm="intarweb.us"\r | |
| 650 \r | |
| 651 """ | |
| 652 | |
| 653 authRequest = """\ | |
| 654 REGISTER sip:intarweb.us SIP/2.0\r | |
| 655 Via: SIP/2.0/UDP 192.168.1.100:50609\r | |
| 656 From: <sip:exarkun@intarweb.us:50609>\r | |
| 657 To: <sip:exarkun@intarweb.us:50609>\r | |
| 658 Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r | |
| 659 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
| 660 CSeq: 9899 REGISTER\r | |
| 661 Expires: 500\r | |
| 662 Authorization: Digest username="exarkun",realm="intarweb.us",nonce="929560764107
67313901322208775",response="4a47980eea31694f997369214292374b",uri="sip:intarweb
.us",algorithm=MD5,opaque="1674186428"\r | |
| 663 User-Agent: X-Lite build 1061\r | |
| 664 Content-Length: 0\r | |
| 665 \r | |
| 666 """ | |
| 667 | |
| 668 okResponse = """\ | |
| 669 SIP/2.0 200 OK\r | |
| 670 Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r | |
| 671 To: <sip:exarkun@intarweb.us:50609>\r | |
| 672 From: <sip:exarkun@intarweb.us:50609>\r | |
| 673 Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r | |
| 674 CSeq: 9899 REGISTER\r | |
| 675 Contact: sip:exarkun@127.0.0.1:5632\r | |
| 676 Expires: 3600\r | |
| 677 Content-Length: 0\r | |
| 678 \r | |
| 679 """ | |
| 680 | |
| 681 class FakeDigestAuthorizer(sip.DigestAuthorizer): | |
| 682 def generateNonce(self): | |
| 683 return '92956076410767313901322208775' | |
| 684 def generateOpaque(self): | |
| 685 return '1674186428' | |
| 686 | |
| 687 | |
| 688 class FakeRegistry(sip.InMemoryRegistry): | |
| 689 """Make sure expiration is always seen to be 3600. | |
| 690 | |
| 691 Otherwise slow reactors fail tests incorrectly. | |
| 692 """ | |
| 693 | |
| 694 def _cbReg(self, reg): | |
| 695 if 3600 < reg.secondsToExpiry or reg.secondsToExpiry < 3598: | |
| 696 raise RuntimeError, "bad seconds to expire: %s" % reg.secondsToExpir
y | |
| 697 reg.secondsToExpiry = 3600 | |
| 698 return reg | |
| 699 | |
| 700 def getRegistrationInfo(self, uri): | |
| 701 return sip.InMemoryRegistry.getRegistrationInfo(self, uri).addCallback(s
elf._cbReg) | |
| 702 | |
| 703 def registerAddress(self, domainURL, logicalURL, physicalURL): | |
| 704 return sip.InMemoryRegistry.registerAddress(self, domainURL, logicalURL,
physicalURL).addCallback(self._cbReg) | |
| 705 | |
| 706 class AuthorizationTestCase(unittest.TestCase): | |
| 707 def setUp(self): | |
| 708 self.proxy = sip.RegisterProxy(host="intarweb.us") | |
| 709 self.proxy.authorizers = self.proxy.authorizers.copy() | |
| 710 self.proxy.authorizers['digest'] = FakeDigestAuthorizer() | |
| 711 self.registry = FakeRegistry("intarweb.us") | |
| 712 self.proxy.registry = self.proxy.locator = self.registry | |
| 713 self.transport = proto_helpers.FakeDatagramTransport() | |
| 714 self.proxy.transport = self.transport | |
| 715 | |
| 716 r = TestRealm() | |
| 717 p = cred.portal.Portal(r) | |
| 718 c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse() | |
| 719 c.addUser('exarkun@intarweb.us', 'password') | |
| 720 p.registerChecker(c) | |
| 721 self.proxy.portal = p | |
| 722 | |
| 723 def tearDown(self): | |
| 724 for d, uri in self.registry.users.values(): | |
| 725 d.cancel() | |
| 726 del self.proxy | |
| 727 | |
| 728 def testChallenge(self): | |
| 729 self.proxy.datagramReceived(registerRequest, ("127.0.0.1", 5632)) | |
| 730 self.assertEquals( | |
| 731 self.transport.written[-1], | |
| 732 ((challengeResponse, ("127.0.0.1", 5632))) | |
| 733 ) | |
| 734 self.transport.written = [] | |
| 735 | |
| 736 self.proxy.datagramReceived(authRequest, ("127.0.0.1", 5632)) | |
| 737 self.assertEquals( | |
| 738 self.transport.written[-1], | |
| 739 ((okResponse, ("127.0.0.1", 5632))) | |
| 740 ) | |
| OLD | NEW |