Index: third_party/twisted_8_1/twisted/web/test/test_web.py |
diff --git a/third_party/twisted_8_1/twisted/web/test/test_web.py b/third_party/twisted_8_1/twisted/web/test/test_web.py |
deleted file mode 100644 |
index 2bbb390ade0e72ed0b4ffefc872c66edceb7d999..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/web/test/test_web.py |
+++ /dev/null |
@@ -1,611 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
-from twisted.trial import unittest |
-from cStringIO import StringIO |
- |
-from twisted.web import server, resource, util |
-from twisted.internet import defer, interfaces, error, task |
-from twisted.web import http |
-from twisted.python import log |
-from twisted.internet.address import IPv4Address |
-from zope.interface import implements |
- |
-class DummyRequest: |
- uri='http://dummy/' |
- method = 'GET' |
- |
- def getHeader(self, h): |
- return None |
- |
- def registerProducer(self, prod,s): |
- self.go = 1 |
- while self.go: |
- prod.resumeProducing() |
- |
- def unregisterProducer(self): |
- self.go = 0 |
- |
- def __init__(self, postpath, session=None): |
- self.sitepath = [] |
- self.written = [] |
- self.finished = 0 |
- self.postpath = postpath |
- self.prepath = [] |
- self.session = None |
- self.protoSession = session or server.Session(0, self) |
- self.args = {} |
- self.outgoingHeaders = {} |
- |
- def setHeader(self, name, value): |
- """TODO: make this assert on write() if the header is content-length |
- """ |
- self.outgoingHeaders[name.lower()] = value |
- |
- def getSession(self): |
- if self.session: |
- return self.session |
- assert not self.written, "Session cannot be requested after data has been written." |
- self.session = self.protoSession |
- return self.session |
- def write(self, data): |
- self.written.append(data) |
- def finish(self): |
- self.finished = self.finished + 1 |
- def addArg(self, name, value): |
- self.args[name] = [value] |
- def setResponseCode(self, code): |
- assert not self.written, "Response code cannot be set after data has been written: %s." % "@@@@".join(self.written) |
- def setLastModified(self, when): |
- assert not self.written, "Last-Modified cannot be set after data has been written: %s." % "@@@@".join(self.written) |
- def setETag(self, tag): |
- assert not self.written, "ETag cannot be set after data has been written: %s." % "@@@@".join(self.written) |
- |
-class ResourceTestCase(unittest.TestCase): |
- def testListEntities(self): |
- r = resource.Resource() |
- self.failUnlessEqual([], r.listEntities()) |
- |
- |
-class SimpleResource(resource.Resource): |
- def render(self, request): |
- if http.CACHED in (request.setLastModified(10), |
- request.setETag('MatchingTag')): |
- return '' |
- else: |
- return "correct" |
- |
-class SiteTest(unittest.TestCase): |
- def testSimplestSite(self): |
- sres1 = SimpleResource() |
- sres2 = SimpleResource() |
- sres1.putChild("",sres2) |
- site = server.Site(sres1) |
- assert site.getResourceFor(DummyRequest([''])) is sres2, "Got the wrong resource." |
- |
- |
- |
-class SessionTest(unittest.TestCase): |
- |
- def setUp(self): |
- """ |
- Set up a session using a simulated scheduler. Creates a |
- C{times} attribute which specifies the return values of the |
- session's C{_getTime} method. |
- """ |
- clock = self.clock = task.Clock() |
- times = self.times = [] |
- |
- class MockSession(server.Session): |
- """ |
- A mock L{server.Session} object which fakes out scheduling |
- with the C{clock} attribute and fakes out the current time |
- to be the elements of L{SessionTest}'s C{times} attribute. |
- """ |
- def loopFactory(self, *a, **kw): |
- """ |
- Create a L{task.LoopingCall} which uses |
- L{SessionTest}'s C{clock} attribute. |
- """ |
- call = task.LoopingCall(*a, **kw) |
- call.clock = clock |
- return call |
- |
- def _getTime(self): |
- return times.pop(0) |
- |
- self.site = server.Site(SimpleResource()) |
- self.site.sessionFactory = MockSession |
- |
- |
- def test_basicExpiration(self): |
- """ |
- Test session expiration: setup a session, and simulate an expiration |
- time. |
- """ |
- self.times.extend([0, server.Session.sessionTimeout + 1]) |
- session = self.site.makeSession() |
- hasExpired = [False] |
- def cbExpire(): |
- hasExpired[0] = True |
- session.notifyOnExpire(cbExpire) |
- self.clock.advance(server.Site.sessionCheckTime - 1) |
- # Looping call should not have been executed |
- self.failIf(hasExpired[0]) |
- |
- self.clock.advance(1) |
- |
- self.failUnless(hasExpired[0]) |
- |
- |
- def test_delayedCallCleanup(self): |
- """ |
- Checking to make sure Sessions do not leave extra DelayedCalls. |
- """ |
- self.times.extend([0, 100]) |
- |
- session = self.site.makeSession() |
- loop = session.checkExpiredLoop |
- session.touch() |
- self.failUnless(loop.running) |
- |
- session.expire() |
- |
- self.failIf(self.clock.calls) |
- self.failIf(loop.running) |
- |
- |
- |
-# Conditional requests: |
-# If-None-Match, If-Modified-Since |
- |
-# make conditional request: |
-# normal response if condition succeeds |
-# if condition fails: |
-# response code |
-# no body |
- |
-def httpBody(whole): |
- return whole.split('\r\n\r\n', 1)[1] |
- |
-def httpHeader(whole, key): |
- key = key.lower() |
- headers = whole.split('\r\n\r\n', 1)[0] |
- for header in headers.split('\r\n'): |
- if header.lower().startswith(key): |
- return header.split(':', 1)[1].strip() |
- return None |
- |
-def httpCode(whole): |
- l1 = whole.split('\r\n', 1)[0] |
- return int(l1.split()[1]) |
- |
-class ConditionalTest(unittest.TestCase): |
- """web.server's handling of conditional requests for cache validation.""" |
- |
- # XXX: test web.distrib. |
- |
- def setUp(self): |
- self.resrc = SimpleResource() |
- self.resrc.putChild('', self.resrc) |
- self.site = server.Site(self.resrc) |
- self.site = server.Site(self.resrc) |
- self.site.logFile = log.logfile |
- |
- # HELLLLLLLLLLP! This harness is Very Ugly. |
- self.channel = self.site.buildProtocol(None) |
- self.transport = http.StringTransport() |
- self.transport.close = lambda *a, **kw: None |
- self.transport.disconnecting = lambda *a, **kw: 0 |
- self.transport.getPeer = lambda *a, **kw: "peer" |
- self.transport.getHost = lambda *a, **kw: "host" |
- self.channel.makeConnection(self.transport) |
- for l in ["GET / HTTP/1.1", |
- "Accept: text/html"]: |
- self.channel.lineReceived(l) |
- |
- def tearDown(self): |
- self.channel.connectionLost(None) |
- |
- def test_modified(self): |
- """If-Modified-Since cache validator (positive)""" |
- self.channel.lineReceived("If-Modified-Since: %s" |
- % http.datetimeToString(1)) |
- self.channel.lineReceived('') |
- result = self.transport.getvalue() |
- self.failUnlessEqual(httpCode(result), http.OK) |
- self.failUnlessEqual(httpBody(result), "correct") |
- |
- def test_unmodified(self): |
- """If-Modified-Since cache validator (negative)""" |
- self.channel.lineReceived("If-Modified-Since: %s" |
- % http.datetimeToString(100)) |
- self.channel.lineReceived('') |
- result = self.transport.getvalue() |
- self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED) |
- self.failUnlessEqual(httpBody(result), "") |
- |
- def test_etagMatchedNot(self): |
- """If-None-Match ETag cache validator (positive)""" |
- self.channel.lineReceived("If-None-Match: unmatchedTag") |
- self.channel.lineReceived('') |
- result = self.transport.getvalue() |
- self.failUnlessEqual(httpCode(result), http.OK) |
- self.failUnlessEqual(httpBody(result), "correct") |
- |
- def test_etagMatched(self): |
- """If-None-Match ETag cache validator (negative)""" |
- self.channel.lineReceived("If-None-Match: MatchingTag") |
- self.channel.lineReceived('') |
- result = self.transport.getvalue() |
- self.failUnlessEqual(httpHeader(result, "ETag"), "MatchingTag") |
- self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED) |
- self.failUnlessEqual(httpBody(result), "") |
- |
-from twisted.web import google |
-class GoogleTestCase(unittest.TestCase): |
- def testCheckGoogle(self): |
- raise unittest.SkipTest("no violation of google ToS") |
- d = google.checkGoogle('site:www.twistedmatrix.com twisted') |
- d.addCallback(self.assertEquals, 'http://twistedmatrix.com/') |
- return d |
- |
-from twisted.web import static |
-from twisted.web import script |
- |
-class StaticFileTest(unittest.TestCase): |
- |
- def testStaticPaths(self): |
- import os |
- dp = os.path.join(self.mktemp(),"hello") |
- ddp = os.path.join(dp, "goodbye") |
- tp = os.path.abspath(os.path.join(dp,"world.txt")) |
- tpy = os.path.join(dp,"wyrld.rpy") |
- os.makedirs(dp) |
- f = open(tp,"wb") |
- f.write("hello world") |
- f = open(tpy, "wb") |
- f.write(""" |
-from twisted.web.static import Data |
-resource = Data('dynamic world','text/plain') |
-""") |
- f = static.File(dp) |
- f.processors = { |
- '.rpy': script.ResourceScript, |
- } |
- |
- f.indexNames = f.indexNames + ['world.txt'] |
- self.assertEquals(f.getChild('', DummyRequest([''])).path, |
- tp) |
- self.assertEquals(f.getChild('wyrld.rpy', DummyRequest(['wyrld.rpy']) |
- ).__class__, |
- static.Data) |
- f = static.File(dp) |
- wtextr = DummyRequest(['world.txt']) |
- wtext = f.getChild('world.txt', wtextr) |
- self.assertEquals(wtext.path, tp) |
- wtext.render(wtextr) |
- self.assertEquals(wtextr.outgoingHeaders.get('content-length'), |
- str(len('hello world'))) |
- self.assertNotEquals(f.getChild('', DummyRequest([''])).__class__, |
- static.File) |
- |
- def testIgnoreExt(self): |
- f = static.File(".") |
- f.ignoreExt(".foo") |
- self.assertEquals(f.ignoredExts, [".foo"]) |
- f = static.File(".") |
- self.assertEquals(f.ignoredExts, []) |
- f = static.File(".", ignoredExts=(".bar", ".baz")) |
- self.assertEquals(f.ignoredExts, [".bar", ".baz"]) |
- |
- def testIgnoredExts(self): |
- import os |
- dp = os.path.join(self.mktemp(), 'allYourBase') |
- fp = os.path.join(dp, 'AreBelong.ToUs') |
- os.makedirs(dp) |
- open(fp, 'wb').write("Take off every 'Zig'!!") |
- f = static.File(dp) |
- f.ignoreExt('.ToUs') |
- dreq = DummyRequest(['']) |
- child_without_ext = f.getChild('AreBelong', dreq) |
- self.assertNotEquals(child_without_ext, f.childNotFound) |
- |
-class DummyChannel: |
- class TCP: |
- port = 80 |
- def getPeer(self): |
- return IPv4Address("TCP", 'client.example.com', 12344) |
- def getHost(self): |
- return IPv4Address("TCP", 'example.com', self.port) |
- class SSL(TCP): |
- implements(interfaces.ISSLTransport) |
- transport = TCP() |
- site = server.Site(resource.Resource()) |
- |
-class TestRequest(unittest.TestCase): |
- |
- def testChildLink(self): |
- request = server.Request(DummyChannel(), 1) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.childLink('baz'), 'bar/baz') |
- request = server.Request(DummyChannel(), 1) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0') |
- self.assertEqual(request.childLink('baz'), 'baz') |
- |
- def testPrePathURLSimple(self): |
- request = server.Request(DummyChannel(), 1) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- request.setHost('example.com', 80) |
- self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar') |
- |
- def testPrePathURLNonDefault(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.TCP() |
- d.transport.port = 81 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 81) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar') |
- |
- def testPrePathURLSSLPort(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.TCP() |
- d.transport.port = 443 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 443) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar') |
- |
- def testPrePathURLSSLPortAndSSL(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.SSL() |
- d.transport.port = 443 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 443) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar') |
- |
- def testPrePathURLHTTPPortAndSSL(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.SSL() |
- d.transport.port = 80 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 80) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'https://example.com:80/foo/bar') |
- |
- def testPrePathURLSSLNonDefault(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.SSL() |
- d.transport.port = 81 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 81) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar') |
- |
- def testPrePathURLSetSSLHost(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.TCP() |
- d.transport.port = 81 |
- request = server.Request(d, 1) |
- request.setHost('foo.com', 81, 1) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar') |
- |
- |
- def test_prePathURLQuoting(self): |
- """ |
- L{Request.prePathURL} quotes special characters in the URL segments to |
- preserve the original meaning. |
- """ |
- d = DummyChannel() |
- request = server.Request(d, 1) |
- request.setHost('example.com', 80) |
- request.gotLength(0) |
- request.requestReceived('GET', '/foo%2Fbar', 'HTTP/1.0') |
- self.assertEqual(request.prePathURL(), 'http://example.com/foo%2Fbar') |
- |
- |
- def testNotifyFinishConnectionLost(self): |
- d = DummyChannel() |
- d.transport = DummyChannel.TCP() |
- request = server.Request(d, 1) |
- finished = request.notifyFinish() |
- request.connectionLost(error.ConnectionDone("Connection done")) |
- return self.assertFailure(finished, error.ConnectionDone) |
- |
- |
-class RootResource(resource.Resource): |
- isLeaf=0 |
- def getChildWithDefault(self, name, request): |
- request.rememberRootURL() |
- return resource.Resource.getChildWithDefault(self, name, request) |
- def render(self, request): |
- return '' |
- |
-class RememberURLTest(unittest.TestCase): |
- def createServer(self, r): |
- chan = DummyChannel() |
- chan.transport = DummyChannel.TCP() |
- chan.site = server.Site(r) |
- return chan |
- |
- def testSimple(self): |
- r = resource.Resource() |
- r.isLeaf=0 |
- rr = RootResource() |
- r.putChild('foo', rr) |
- rr.putChild('', rr) |
- rr.putChild('bar', resource.Resource()) |
- chan = self.createServer(r) |
- for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']: |
- request = server.Request(chan, 1) |
- request.setHost('example.com', 81) |
- request.gotLength(0) |
- request.requestReceived('GET', url, 'HTTP/1.0') |
- self.assertEqual(request.getRootURL(), "http://example.com/foo") |
- |
- def testRoot(self): |
- rr = RootResource() |
- rr.putChild('', rr) |
- rr.putChild('bar', resource.Resource()) |
- chan = self.createServer(rr) |
- for url in ['/', '/bar', '/bar/baz', '/bar/']: |
- request = server.Request(chan, 1) |
- request.setHost('example.com', 81) |
- request.gotLength(0) |
- request.requestReceived('GET', url, 'HTTP/1.0') |
- self.assertEqual(request.getRootURL(), "http://example.com/") |
- |
- |
-class NewRenderResource(resource.Resource): |
- def render_GET(self, request): |
- return "hi hi" |
- |
- def render_HEH(self, request): |
- return "ho ho" |
- |
- |
-class NewRenderTestCase(unittest.TestCase): |
- def _getReq(self): |
- d = DummyChannel() |
- d.site.resource.putChild('newrender', NewRenderResource()) |
- d.transport = DummyChannel.TCP() |
- d.transport.port = 81 |
- request = server.Request(d, 1) |
- request.setHost('example.com', 81) |
- request.gotLength(0) |
- return request |
- |
- def testGoodMethods(self): |
- req = self._getReq() |
- req.requestReceived('GET', '/newrender', 'HTTP/1.0') |
- self.assertEquals(req.transport.getvalue().splitlines()[-1], 'hi hi') |
- |
- req = self._getReq() |
- req.requestReceived('HEH', '/newrender', 'HTTP/1.0') |
- self.assertEquals(req.transport.getvalue().splitlines()[-1], 'ho ho') |
- |
- def testBadMethods(self): |
- req = self._getReq() |
- req.requestReceived('CONNECT', '/newrender', 'HTTP/1.0') |
- self.assertEquals(req.code, 501) |
- |
- req = self._getReq() |
- req.requestReceived('hlalauguG', '/newrender', 'HTTP/1.0') |
- self.assertEquals(req.code, 501) |
- |
- def testImplicitHead(self): |
- req = self._getReq() |
- req.requestReceived('HEAD', '/newrender', 'HTTP/1.0') |
- self.assertEquals(req.code, 200) |
- self.assertEquals(-1, req.transport.getvalue().find('hi hi')) |
- |
- |
-class SDResource(resource.Resource): |
- def __init__(self,default): self.default=default |
- def getChildWithDefault(self,name,request): |
- d=defer.succeed(self.default) |
- return util.DeferredResource(d).getChildWithDefault(name, request) |
- |
-class SDTest(unittest.TestCase): |
- |
- def testDeferredResource(self): |
- r = resource.Resource() |
- r.isLeaf = 1 |
- s = SDResource(r) |
- d = DummyRequest(['foo', 'bar', 'baz']) |
- resource.getChildForRequest(s, d) |
- self.assertEqual(d.postpath, ['bar', 'baz']) |
- |
-class DummyRequestForLogTest(DummyRequest): |
- uri='/dummy' # parent class uri has "http://", which doesn't really happen |
- code = 123 |
- client = '1.2.3.4' |
- clientproto = 'HTTP/1.0' |
- sentLength = None |
- |
- def __init__(self, *a, **kw): |
- DummyRequest.__init__(self, *a, **kw) |
- self.headers = {} |
- |
- def getHeader(self, h): |
- return self.headers.get(h.lower(), None) |
- |
- def getClientIP(self): |
- return self.client |
- |
-class TestLogEscaping(unittest.TestCase): |
- def setUp(self): |
- self.site = http.HTTPFactory() |
- self.site.logFile = StringIO() |
- self.request = DummyRequestForLogTest(self.site, False) |
- |
- def testSimple(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "-"\n') |
- |
- def testMethodQuote(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.request.method = 'G"T' |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "G\\"T /dummy HTTP/1.0" 123 - "-" "-"\n') |
- |
- def testRequestQuote(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.request.uri='/dummy"withquote' |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy\\"withquote HTTP/1.0" 123 - "-" "-"\n') |
- |
- def testProtoQuote(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.request.clientproto='HT"P/1.0' |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HT\\"P/1.0" 123 - "-" "-"\n') |
- |
- def testRefererQuote(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.request.headers['referer'] = 'http://malicious" ".website.invalid' |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "http://malicious\\" \\".website.invalid" "-"\n') |
- |
- def testUserAgentQuote(self): |
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % ( |
- 25, 'Oct', 2004, 12, 31, 59) |
- self.request.headers['user-agent'] = 'Malicious Web" Evil' |
- self.site.log(self.request) |
- self.site.logFile.seek(0) |
- self.assertEqual( |
- self.site.logFile.read(), |
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "Malicious Web\\" Evil"\n') |