| Index: third_party/twisted_8_1/twisted/test/test_sip.py
|
| diff --git a/third_party/twisted_8_1/twisted/test/test_sip.py b/third_party/twisted_8_1/twisted/test/test_sip.py
|
| deleted file mode 100644
|
| index 332fcb2d5b4341afffcb95475dc5295bcccfc9aa..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/test/test_sip.py
|
| +++ /dev/null
|
| @@ -1,740 +0,0 @@
|
| -# -*- test-case-name: twisted.test.test_sip -*-
|
| -# Copyright (c) 2001-2004 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -
|
| -"""Session Initialization Protocol tests."""
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.protocols import sip
|
| -from twisted.internet import defer, reactor
|
| -
|
| -from twisted.test import proto_helpers
|
| -
|
| -from twisted import cred
|
| -import twisted.cred.portal
|
| -import twisted.cred.checkers
|
| -
|
| -from zope.interface import implements
|
| -
|
| -
|
| -# request, prefixed by random CRLFs
|
| -request1 = "\n\r\n\n\r" + """\
|
| -INVITE sip:foo SIP/2.0
|
| -From: mo
|
| -To: joe
|
| -Content-Length: 4
|
| -
|
| -abcd""".replace("\n", "\r\n")
|
| -
|
| -# request, no content-length
|
| -request2 = """INVITE sip:foo SIP/2.0
|
| -From: mo
|
| -To: joe
|
| -
|
| -1234""".replace("\n", "\r\n")
|
| -
|
| -# request, with garbage after
|
| -request3 = """INVITE sip:foo SIP/2.0
|
| -From: mo
|
| -To: joe
|
| -Content-Length: 4
|
| -
|
| -1234
|
| -
|
| -lalalal""".replace("\n", "\r\n")
|
| -
|
| -# three requests
|
| -request4 = """INVITE sip:foo SIP/2.0
|
| -From: mo
|
| -To: joe
|
| -Content-Length: 0
|
| -
|
| -INVITE sip:loop SIP/2.0
|
| -From: foo
|
| -To: bar
|
| -Content-Length: 4
|
| -
|
| -abcdINVITE sip:loop SIP/2.0
|
| -From: foo
|
| -To: bar
|
| -Content-Length: 4
|
| -
|
| -1234""".replace("\n", "\r\n")
|
| -
|
| -# response, no content
|
| -response1 = """SIP/2.0 200 OK
|
| -From: foo
|
| -To:bar
|
| -Content-Length: 0
|
| -
|
| -""".replace("\n", "\r\n")
|
| -
|
| -# short header version
|
| -request_short = """\
|
| -INVITE sip:foo SIP/2.0
|
| -f: mo
|
| -t: joe
|
| -l: 4
|
| -
|
| -abcd""".replace("\n", "\r\n")
|
| -
|
| -request_natted = """\
|
| -INVITE sip:foo SIP/2.0
|
| -Via: SIP/2.0/UDP 10.0.0.1:5060;rport
|
| -
|
| -""".replace("\n", "\r\n")
|
| -
|
| -class TestRealm:
|
| - def requestAvatar(self, avatarId, mind, *interfaces):
|
| - return sip.IContact, None, lambda: None
|
| -
|
| -class MessageParsingTestCase(unittest.TestCase):
|
| - def setUp(self):
|
| - self.l = []
|
| - self.parser = sip.MessagesParser(self.l.append)
|
| -
|
| - def feedMessage(self, message):
|
| - self.parser.dataReceived(message)
|
| - self.parser.dataDone()
|
| -
|
| - def validateMessage(self, m, method, uri, headers, body):
|
| - """Validate Requests."""
|
| - self.assertEquals(m.method, method)
|
| - self.assertEquals(m.uri.toString(), uri)
|
| - self.assertEquals(m.headers, headers)
|
| - self.assertEquals(m.body, body)
|
| - self.assertEquals(m.finished, 1)
|
| -
|
| - def testSimple(self):
|
| - l = self.l
|
| - self.feedMessage(request1)
|
| - self.assertEquals(len(l), 1)
|
| - self.validateMessage(l[0], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
|
| - "abcd")
|
| -
|
| - def testTwoMessages(self):
|
| - l = self.l
|
| - self.feedMessage(request1)
|
| - self.feedMessage(request2)
|
| - self.assertEquals(len(l), 2)
|
| - self.validateMessage(l[0], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
|
| - "abcd")
|
| - self.validateMessage(l[1], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"]},
|
| - "1234")
|
| -
|
| - def testGarbage(self):
|
| - l = self.l
|
| - self.feedMessage(request3)
|
| - self.assertEquals(len(l), 1)
|
| - self.validateMessage(l[0], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
|
| - "1234")
|
| -
|
| - def testThreeInOne(self):
|
| - l = self.l
|
| - self.feedMessage(request4)
|
| - self.assertEquals(len(l), 3)
|
| - self.validateMessage(l[0], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"], "content-length": ["0"]},
|
| - "")
|
| - self.validateMessage(l[1], "INVITE", "sip:loop",
|
| - {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
|
| - "abcd")
|
| - self.validateMessage(l[2], "INVITE", "sip:loop",
|
| - {"from": ["foo"], "to": ["bar"], "content-length": ["4"]},
|
| - "1234")
|
| -
|
| - def testShort(self):
|
| - l = self.l
|
| - self.feedMessage(request_short)
|
| - self.assertEquals(len(l), 1)
|
| - self.validateMessage(l[0], "INVITE", "sip:foo",
|
| - {"from": ["mo"], "to": ["joe"], "content-length": ["4"]},
|
| - "abcd")
|
| -
|
| - def testSimpleResponse(self):
|
| - l = self.l
|
| - self.feedMessage(response1)
|
| - self.assertEquals(len(l), 1)
|
| - m = l[0]
|
| - self.assertEquals(m.code, 200)
|
| - self.assertEquals(m.phrase, "OK")
|
| - self.assertEquals(m.headers, {"from": ["foo"], "to": ["bar"], "content-length": ["0"]})
|
| - self.assertEquals(m.body, "")
|
| - self.assertEquals(m.finished, 1)
|
| -
|
| -
|
| -class MessageParsingTestCase2(MessageParsingTestCase):
|
| - """Same as base class, but feed data char by char."""
|
| -
|
| - def feedMessage(self, message):
|
| - for c in message:
|
| - self.parser.dataReceived(c)
|
| - self.parser.dataDone()
|
| -
|
| -
|
| -class MakeMessageTestCase(unittest.TestCase):
|
| -
|
| - def testRequest(self):
|
| - r = sip.Request("INVITE", "sip:foo")
|
| - r.addHeader("foo", "bar")
|
| - self.assertEquals(r.toString(), "INVITE sip:foo SIP/2.0\r\nFoo: bar\r\n\r\n")
|
| -
|
| - def testResponse(self):
|
| - r = sip.Response(200, "OK")
|
| - r.addHeader("foo", "bar")
|
| - r.addHeader("Content-Length", "4")
|
| - r.bodyDataReceived("1234")
|
| - self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\nFoo: bar\r\nContent-Length: 4\r\n\r\n1234")
|
| -
|
| - def testStatusCode(self):
|
| - r = sip.Response(200)
|
| - self.assertEquals(r.toString(), "SIP/2.0 200 OK\r\n\r\n")
|
| -
|
| -
|
| -class ViaTestCase(unittest.TestCase):
|
| -
|
| - def checkRoundtrip(self, v):
|
| - s = v.toString()
|
| - self.assertEquals(s, sip.parseViaHeader(s).toString())
|
| -
|
| - def testExtraWhitespace(self):
|
| - v1 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
|
| - v2 = sip.parseViaHeader('SIP/2.0/UDP 192.168.1.1:5060')
|
| - self.assertEquals(v1.transport, v2.transport)
|
| - self.assertEquals(v1.host, v2.host)
|
| - self.assertEquals(v1.port, v2.port)
|
| -
|
| - def testComplex(self):
|
| - s = "SIP/2.0/UDP first.example.com:4000;ttl=16;maddr=224.2.0.1 ;branch=a7c6a8dlze (Example)"
|
| - v = sip.parseViaHeader(s)
|
| - self.assertEquals(v.transport, "UDP")
|
| - self.assertEquals(v.host, "first.example.com")
|
| - self.assertEquals(v.port, 4000)
|
| - self.assertEquals(v.ttl, 16)
|
| - self.assertEquals(v.maddr, "224.2.0.1")
|
| - self.assertEquals(v.branch, "a7c6a8dlze")
|
| - self.assertEquals(v.hidden, 0)
|
| - self.assertEquals(v.toString(),
|
| - "SIP/2.0/UDP first.example.com:4000;ttl=16;branch=a7c6a8dlze;maddr=224.2.0.1")
|
| - self.checkRoundtrip(v)
|
| -
|
| - def testSimple(self):
|
| - s = "SIP/2.0/UDP example.com;hidden"
|
| - v = sip.parseViaHeader(s)
|
| - self.assertEquals(v.transport, "UDP")
|
| - self.assertEquals(v.host, "example.com")
|
| - self.assertEquals(v.port, 5060)
|
| - self.assertEquals(v.ttl, None)
|
| - self.assertEquals(v.maddr, None)
|
| - self.assertEquals(v.branch, None)
|
| - self.assertEquals(v.hidden, 1)
|
| - self.assertEquals(v.toString(),
|
| - "SIP/2.0/UDP example.com:5060;hidden")
|
| - self.checkRoundtrip(v)
|
| -
|
| - def testSimpler(self):
|
| - v = sip.Via("example.com")
|
| - self.checkRoundtrip(v)
|
| -
|
| - def testRPort(self):
|
| - v = sip.Via("foo.bar", rport=True)
|
| - self.assertEquals(v.toString(), "SIP/2.0/UDP foo.bar:5060;rport")
|
| -
|
| - def testNAT(self):
|
| - s = "SIP/2.0/UDP 10.0.0.1:5060;received=22.13.1.5;rport=12345"
|
| - v = sip.parseViaHeader(s)
|
| - self.assertEquals(v.transport, "UDP")
|
| - self.assertEquals(v.host, "10.0.0.1")
|
| - self.assertEquals(v.port, 5060)
|
| - self.assertEquals(v.received, "22.13.1.5")
|
| - self.assertEquals(v.rport, 12345)
|
| -
|
| - self.assertNotEquals(v.toString().find("rport=12345"), -1)
|
| -
|
| -class URLTestCase(unittest.TestCase):
|
| -
|
| - def testRoundtrip(self):
|
| - for url in [
|
| - "sip:j.doe@big.com",
|
| - "sip:j.doe:secret@big.com;transport=tcp",
|
| - "sip:j.doe@big.com?subject=project",
|
| - "sip:example.com",
|
| - ]:
|
| - self.assertEquals(sip.parseURL(url).toString(), url)
|
| -
|
| - def testComplex(self):
|
| - s = ("sip:user:pass@hosta:123;transport=udp;user=phone;method=foo;"
|
| - "ttl=12;maddr=1.2.3.4;blah;goo=bar?a=b&c=d")
|
| - url = sip.parseURL(s)
|
| - for k, v in [("username", "user"), ("password", "pass"),
|
| - ("host", "hosta"), ("port", 123),
|
| - ("transport", "udp"), ("usertype", "phone"),
|
| - ("method", "foo"), ("ttl", 12),
|
| - ("maddr", "1.2.3.4"), ("other", ["blah", "goo=bar"]),
|
| - ("headers", {"a": "b", "c": "d"})]:
|
| - self.assertEquals(getattr(url, k), v)
|
| -
|
| -
|
| -class ParseTestCase(unittest.TestCase):
|
| -
|
| - def testParseAddress(self):
|
| - for address, name, urls, params in [
|
| - ('"A. G. Bell" <sip:foo@example.com>', "A. G. Bell", "sip:foo@example.com", {}),
|
| - ("Anon <sip:foo@example.com>", "Anon", "sip:foo@example.com", {}),
|
| - ("sip:foo@example.com", "", "sip:foo@example.com", {}),
|
| - ("<sip:foo@example.com>", "", "sip:foo@example.com", {}),
|
| - ("foo <sip:foo@example.com>;tag=bar;foo=baz", "foo", "sip:foo@example.com", {"tag": "bar", "foo": "baz"}),
|
| - ]:
|
| - gname, gurl, gparams = sip.parseAddress(address)
|
| - self.assertEquals(name, gname)
|
| - self.assertEquals(gurl.toString(), urls)
|
| - self.assertEquals(gparams, params)
|
| -
|
| -
|
| -class DummyLocator:
|
| - implements(sip.ILocator)
|
| - def getAddress(self, logicalURL):
|
| - return defer.succeed(sip.URL("server.com", port=5060))
|
| -
|
| -class FailingLocator:
|
| - implements(sip.ILocator)
|
| - def getAddress(self, logicalURL):
|
| - return defer.fail(LookupError())
|
| -
|
| -
|
| -class ProxyTestCase(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.proxy = sip.Proxy("127.0.0.1")
|
| - self.proxy.locator = DummyLocator()
|
| - self.sent = []
|
| - self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
|
| -
|
| - def testRequestForward(self):
|
| - r = sip.Request("INVITE", "sip:foo")
|
| - r.addHeader("via", sip.Via("1.2.3.4").toString())
|
| - r.addHeader("via", sip.Via("1.2.3.5").toString())
|
| - r.addHeader("foo", "bar")
|
| - r.addHeader("to", "<sip:joe@server.com>")
|
| - r.addHeader("contact", "<sip:joe@1.2.3.5>")
|
| - self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals(dest.port, 5060)
|
| - self.assertEquals(dest.host, "server.com")
|
| - self.assertEquals(m.uri.toString(), "sip:foo")
|
| - self.assertEquals(m.method, "INVITE")
|
| - self.assertEquals(m.headers["via"],
|
| - ["SIP/2.0/UDP 127.0.0.1:5060",
|
| - "SIP/2.0/UDP 1.2.3.4:5060",
|
| - "SIP/2.0/UDP 1.2.3.5:5060"])
|
| -
|
| -
|
| - def testReceivedRequestForward(self):
|
| - r = sip.Request("INVITE", "sip:foo")
|
| - r.addHeader("via", sip.Via("1.2.3.4").toString())
|
| - r.addHeader("foo", "bar")
|
| - r.addHeader("to", "<sip:joe@server.com>")
|
| - r.addHeader("contact", "<sip:joe@1.2.3.4>")
|
| - self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
|
| - dest, m = self.sent[0]
|
| - self.assertEquals(m.headers["via"],
|
| - ["SIP/2.0/UDP 127.0.0.1:5060",
|
| - "SIP/2.0/UDP 1.2.3.4:5060;received=1.1.1.1"])
|
| -
|
| -
|
| - def testResponseWrongVia(self):
|
| - # first via must match proxy's address
|
| - r = sip.Response(200)
|
| - r.addHeader("via", sip.Via("foo.com").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
|
| - self.assertEquals(len(self.sent), 0)
|
| -
|
| - def testResponseForward(self):
|
| - r = sip.Response(200)
|
| - r.addHeader("via", sip.Via("127.0.0.1").toString())
|
| - r.addHeader("via", sip.Via("client.com", port=1234).toString())
|
| - self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals((dest.host, dest.port), ("client.com", 1234))
|
| - self.assertEquals(m.code, 200)
|
| - self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:1234"])
|
| -
|
| - def testReceivedResponseForward(self):
|
| - r = sip.Response(200)
|
| - r.addHeader("via", sip.Via("127.0.0.1").toString())
|
| - r.addHeader("via", sip.Via("10.0.0.1", received="client.com").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals((dest.host, dest.port), ("client.com", 5060))
|
| -
|
| - def testResponseToUs(self):
|
| - r = sip.Response(200)
|
| - r.addHeader("via", sip.Via("127.0.0.1").toString())
|
| - l = []
|
| - self.proxy.gotResponse = lambda *a: l.append(a)
|
| - self.proxy.datagramReceived(r.toString(), ("1.1.1.1", 5060))
|
| - self.assertEquals(len(l), 1)
|
| - m, addr = l[0]
|
| - self.assertEquals(len(m.headers.get("via", [])), 0)
|
| - self.assertEquals(m.code, 200)
|
| -
|
| - def testLoop(self):
|
| - r = sip.Request("INVITE", "sip:foo")
|
| - r.addHeader("via", sip.Via("1.2.3.4").toString())
|
| - r.addHeader("via", sip.Via("127.0.0.1").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| - self.assertEquals(self.sent, [])
|
| -
|
| - def testCantForwardRequest(self):
|
| - r = sip.Request("INVITE", "sip:foo")
|
| - r.addHeader("via", sip.Via("1.2.3.4").toString())
|
| - r.addHeader("to", "<sip:joe@server.com>")
|
| - self.proxy.locator = FailingLocator()
|
| - self.proxy.datagramReceived(r.toString(), ("1.2.3.4", 5060))
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals((dest.host, dest.port), ("1.2.3.4", 5060))
|
| - self.assertEquals(m.code, 404)
|
| - self.assertEquals(m.headers["via"], ["SIP/2.0/UDP 1.2.3.4:5060"])
|
| -
|
| - def testCantForwardResponse(self):
|
| - pass
|
| -
|
| - #testCantForwardResponse.skip = "not implemented yet"
|
| -
|
| -
|
| -class RegistrationTestCase(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.proxy = sip.RegisterProxy(host="127.0.0.1")
|
| - self.registry = sip.InMemoryRegistry("bell.example.com")
|
| - self.proxy.registry = self.proxy.locator = self.registry
|
| - self.sent = []
|
| - self.proxy.sendMessage = lambda dest, msg: self.sent.append((dest, msg))
|
| -
|
| - def tearDown(self):
|
| - for d, uri in self.registry.users.values():
|
| - d.cancel()
|
| - del self.proxy
|
| -
|
| - def register(self):
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@client.com:1234")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| -
|
| - def unregister(self):
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "*")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - r.addHeader("expires", "0")
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| -
|
| - def testRegister(self):
|
| - self.register()
|
| - dest, m = self.sent[0]
|
| - self.assertEquals((dest.host, dest.port), ("client.com", 5060))
|
| - self.assertEquals(m.code, 200)
|
| - self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
|
| - self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"])
|
| - self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"])
|
| - self.failUnless(int(m.headers["expires"][0]) in (3600, 3601, 3599, 3598))
|
| - self.assertEquals(len(self.registry.users), 1)
|
| - dc, uri = self.registry.users["joe"]
|
| - self.assertEquals(uri.toString(), "sip:joe@client.com:5060")
|
| - d = self.proxy.locator.getAddress(sip.URL(username="joe",
|
| - host="bell.example.com"))
|
| - d.addCallback(lambda desturl : (desturl.host, desturl.port))
|
| - d.addCallback(self.assertEquals, ('client.com', 5060))
|
| - return d
|
| -
|
| - def testUnregister(self):
|
| - self.register()
|
| - self.unregister()
|
| - dest, m = self.sent[1]
|
| - self.assertEquals((dest.host, dest.port), ("client.com", 5060))
|
| - self.assertEquals(m.code, 200)
|
| - self.assertEquals(m.headers["via"], ["SIP/2.0/UDP client.com:5060"])
|
| - self.assertEquals(m.headers["to"], ["sip:joe@bell.example.com"])
|
| - self.assertEquals(m.headers["contact"], ["sip:joe@client.com:5060"])
|
| - self.assertEquals(m.headers["expires"], ["0"])
|
| - self.assertEquals(self.registry.users, {})
|
| -
|
| -
|
| - def addPortal(self):
|
| - r = TestRealm()
|
| - p = cred.portal.Portal(r)
|
| - c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - c.addUser('userXname@127.0.0.1', 'passXword')
|
| - p.registerChecker(c)
|
| - self.proxy.portal = p
|
| -
|
| - def testFailedAuthentication(self):
|
| - self.addPortal()
|
| - self.register()
|
| -
|
| - self.assertEquals(len(self.registry.users), 0)
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals(m.code, 401)
|
| -
|
| -
|
| - def testBasicAuthentication(self):
|
| - self.addPortal()
|
| - self.proxy.authorizers = self.proxy.authorizers.copy()
|
| - self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
|
| -
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@client.com:1234")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - r.addHeader("authorization", "Basic " + "userXname:passXword".encode('base64'))
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| -
|
| - self.assertEquals(len(self.registry.users), 1)
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals(m.code, 200)
|
| -
|
| -
|
| - def testFailedBasicAuthentication(self):
|
| - self.addPortal()
|
| - self.proxy.authorizers = self.proxy.authorizers.copy()
|
| - self.proxy.authorizers['basic'] = sip.BasicAuthorizer()
|
| -
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@client.com:1234")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - r.addHeader("authorization", "Basic " + "userXname:password".encode('base64'))
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| -
|
| - self.assertEquals(len(self.registry.users), 0)
|
| - self.assertEquals(len(self.sent), 1)
|
| - dest, m = self.sent[0]
|
| - self.assertEquals(m.code, 401)
|
| -
|
| - def testWrongDomainRegister(self):
|
| - r = sip.Request("REGISTER", "sip:wrong.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@client.com:1234")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| - self.assertEquals(len(self.sent), 0)
|
| -
|
| - def testWrongToDomainRegister(self):
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@foo.com")
|
| - r.addHeader("contact", "sip:joe@client.com:1234")
|
| - r.addHeader("via", sip.Via("client.com").toString())
|
| - self.proxy.datagramReceived(r.toString(), ("client.com", 5060))
|
| - self.assertEquals(len(self.sent), 0)
|
| -
|
| - def testWrongDomainLookup(self):
|
| - self.register()
|
| - url = sip.URL(username="joe", host="foo.com")
|
| - d = self.proxy.locator.getAddress(url)
|
| - self.assertFailure(d, LookupError)
|
| - return d
|
| -
|
| - def testNoContactLookup(self):
|
| - self.register()
|
| - url = sip.URL(username="jane", host="bell.example.com")
|
| - d = self.proxy.locator.getAddress(url)
|
| - self.assertFailure(d, LookupError)
|
| - return d
|
| -
|
| -
|
| -class Client(sip.Base):
|
| -
|
| - def __init__(self):
|
| - sip.Base.__init__(self)
|
| - self.received = []
|
| - self.deferred = defer.Deferred()
|
| -
|
| - def handle_response(self, response, addr):
|
| - self.received.append(response)
|
| - self.deferred.callback(self.received)
|
| -
|
| -
|
| -class LiveTest(unittest.TestCase):
|
| -
|
| - def setUp(self):
|
| - self.proxy = sip.RegisterProxy(host="127.0.0.1")
|
| - self.registry = sip.InMemoryRegistry("bell.example.com")
|
| - self.proxy.registry = self.proxy.locator = self.registry
|
| - self.serverPort = reactor.listenUDP(0, self.proxy, interface="127.0.0.1")
|
| - self.client = Client()
|
| - self.clientPort = reactor.listenUDP(0, self.client, interface="127.0.0.1")
|
| - self.serverAddress = (self.serverPort.getHost().host,
|
| - self.serverPort.getHost().port)
|
| -
|
| - def tearDown(self):
|
| - for d, uri in self.registry.users.values():
|
| - d.cancel()
|
| - d1 = defer.maybeDeferred(self.clientPort.stopListening)
|
| - d2 = defer.maybeDeferred(self.serverPort.stopListening)
|
| - return defer.gatherResults([d1, d2])
|
| -
|
| - def testRegister(self):
|
| - p = self.clientPort.getHost().port
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
|
| - r.addHeader("via", sip.Via("127.0.0.1", port=p).toString())
|
| - self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddress[1]),
|
| - r)
|
| - d = self.client.deferred
|
| - def check(received):
|
| - self.assertEquals(len(received), 1)
|
| - r = received[0]
|
| - self.assertEquals(r.code, 200)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| - def testAmoralRPort(self):
|
| - # rport is allowed without a value, apparently because server
|
| - # implementors might be too stupid to check the received port
|
| - # against 5060 and see if they're equal, and because client
|
| - # implementors might be too stupid to bind to port 5060, or set a
|
| - # value on the rport parameter they send if they bind to another
|
| - # port.
|
| - p = self.clientPort.getHost().port
|
| - r = sip.Request("REGISTER", "sip:bell.example.com")
|
| - r.addHeader("to", "sip:joe@bell.example.com")
|
| - r.addHeader("contact", "sip:joe@127.0.0.1:%d" % p)
|
| - r.addHeader("via", sip.Via("127.0.0.1", port=p, rport=True).toString())
|
| - self.client.sendMessage(sip.URL(host="127.0.0.1", port=self.serverAddress[1]),
|
| - r)
|
| - d = self.client.deferred
|
| - def check(received):
|
| - self.assertEquals(len(received), 1)
|
| - r = received[0]
|
| - self.assertEquals(r.code, 200)
|
| - d.addCallback(check)
|
| - return d
|
| -
|
| -
|
| -registerRequest = """
|
| -REGISTER sip:intarweb.us SIP/2.0\r
|
| -Via: SIP/2.0/UDP 192.168.1.100:50609\r
|
| -From: <sip:exarkun@intarweb.us:50609>\r
|
| -To: <sip:exarkun@intarweb.us:50609>\r
|
| -Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
|
| -Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
|
| -CSeq: 9898 REGISTER\r
|
| -Expires: 500\r
|
| -User-Agent: X-Lite build 1061\r
|
| -Content-Length: 0\r
|
| -\r
|
| -"""
|
| -
|
| -challengeResponse = """\
|
| -SIP/2.0 401 Unauthorized\r
|
| -Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
|
| -To: <sip:exarkun@intarweb.us:50609>\r
|
| -From: <sip:exarkun@intarweb.us:50609>\r
|
| -Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
|
| -CSeq: 9898 REGISTER\r
|
| -WWW-Authenticate: Digest nonce="92956076410767313901322208775",opaque="1674186428",qop-options="auth",algorithm="MD5",realm="intarweb.us"\r
|
| -\r
|
| -"""
|
| -
|
| -authRequest = """\
|
| -REGISTER sip:intarweb.us SIP/2.0\r
|
| -Via: SIP/2.0/UDP 192.168.1.100:50609\r
|
| -From: <sip:exarkun@intarweb.us:50609>\r
|
| -To: <sip:exarkun@intarweb.us:50609>\r
|
| -Contact: "exarkun" <sip:exarkun@192.168.1.100:50609>\r
|
| -Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
|
| -CSeq: 9899 REGISTER\r
|
| -Expires: 500\r
|
| -Authorization: Digest username="exarkun",realm="intarweb.us",nonce="92956076410767313901322208775",response="4a47980eea31694f997369214292374b",uri="sip:intarweb.us",algorithm=MD5,opaque="1674186428"\r
|
| -User-Agent: X-Lite build 1061\r
|
| -Content-Length: 0\r
|
| -\r
|
| -"""
|
| -
|
| -okResponse = """\
|
| -SIP/2.0 200 OK\r
|
| -Via: SIP/2.0/UDP 192.168.1.100:50609;received=127.0.0.1;rport=5632\r
|
| -To: <sip:exarkun@intarweb.us:50609>\r
|
| -From: <sip:exarkun@intarweb.us:50609>\r
|
| -Call-ID: 94E7E5DAF39111D791C6000393764646@intarweb.us\r
|
| -CSeq: 9899 REGISTER\r
|
| -Contact: sip:exarkun@127.0.0.1:5632\r
|
| -Expires: 3600\r
|
| -Content-Length: 0\r
|
| -\r
|
| -"""
|
| -
|
| -class FakeDigestAuthorizer(sip.DigestAuthorizer):
|
| - def generateNonce(self):
|
| - return '92956076410767313901322208775'
|
| - def generateOpaque(self):
|
| - return '1674186428'
|
| -
|
| -
|
| -class FakeRegistry(sip.InMemoryRegistry):
|
| - """Make sure expiration is always seen to be 3600.
|
| -
|
| - Otherwise slow reactors fail tests incorrectly.
|
| - """
|
| -
|
| - def _cbReg(self, reg):
|
| - if 3600 < reg.secondsToExpiry or reg.secondsToExpiry < 3598:
|
| - raise RuntimeError, "bad seconds to expire: %s" % reg.secondsToExpiry
|
| - reg.secondsToExpiry = 3600
|
| - return reg
|
| -
|
| - def getRegistrationInfo(self, uri):
|
| - return sip.InMemoryRegistry.getRegistrationInfo(self, uri).addCallback(self._cbReg)
|
| -
|
| - def registerAddress(self, domainURL, logicalURL, physicalURL):
|
| - return sip.InMemoryRegistry.registerAddress(self, domainURL, logicalURL, physicalURL).addCallback(self._cbReg)
|
| -
|
| -class AuthorizationTestCase(unittest.TestCase):
|
| - def setUp(self):
|
| - self.proxy = sip.RegisterProxy(host="intarweb.us")
|
| - self.proxy.authorizers = self.proxy.authorizers.copy()
|
| - self.proxy.authorizers['digest'] = FakeDigestAuthorizer()
|
| - self.registry = FakeRegistry("intarweb.us")
|
| - self.proxy.registry = self.proxy.locator = self.registry
|
| - self.transport = proto_helpers.FakeDatagramTransport()
|
| - self.proxy.transport = self.transport
|
| -
|
| - r = TestRealm()
|
| - p = cred.portal.Portal(r)
|
| - c = cred.checkers.InMemoryUsernamePasswordDatabaseDontUse()
|
| - c.addUser('exarkun@intarweb.us', 'password')
|
| - p.registerChecker(c)
|
| - self.proxy.portal = p
|
| -
|
| - def tearDown(self):
|
| - for d, uri in self.registry.users.values():
|
| - d.cancel()
|
| - del self.proxy
|
| -
|
| - def testChallenge(self):
|
| - self.proxy.datagramReceived(registerRequest, ("127.0.0.1", 5632))
|
| - self.assertEquals(
|
| - self.transport.written[-1],
|
| - ((challengeResponse, ("127.0.0.1", 5632)))
|
| - )
|
| - self.transport.written = []
|
| -
|
| - self.proxy.datagramReceived(authRequest, ("127.0.0.1", 5632))
|
| - self.assertEquals(
|
| - self.transport.written[-1],
|
| - ((okResponse, ("127.0.0.1", 5632)))
|
| - )
|
|
|