Chromium Code Reviews
chromiumcodereview-hr@appspot.gserviceaccount.com (chromiumcodereview-hr) | Please choose your nickname with Settings | Help | Chromium Project | Gerrit Changes | Sign out
(207)

Unified Diff: third_party/twisted_8_1/twisted/web/test/test_web.py

Issue 12261012: Remove third_party/twisted_8_1 (Closed) Base URL: svn://svn.chromium.org/chrome/trunk/tools/build
Patch Set: Created 7 years, 10 months ago
Use n/p to move between diff chunks; N/P to move between comments. Draft comments are only viewable by you.
Jump to:
View side-by-side diff with in-line comments
Download patch
Index: third_party/twisted_8_1/twisted/web/test/test_web.py
diff --git a/third_party/twisted_8_1/twisted/web/test/test_web.py b/third_party/twisted_8_1/twisted/web/test/test_web.py
deleted file mode 100644
index 2bbb390ade0e72ed0b4ffefc872c66edceb7d999..0000000000000000000000000000000000000000
--- a/third_party/twisted_8_1/twisted/web/test/test_web.py
+++ /dev/null
@@ -1,611 +0,0 @@
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories.
-# See LICENSE for details.
-
-from twisted.trial import unittest
-from cStringIO import StringIO
-
-from twisted.web import server, resource, util
-from twisted.internet import defer, interfaces, error, task
-from twisted.web import http
-from twisted.python import log
-from twisted.internet.address import IPv4Address
-from zope.interface import implements
-
-class DummyRequest:
- uri='http://dummy/'
- method = 'GET'
-
- def getHeader(self, h):
- return None
-
- def registerProducer(self, prod,s):
- self.go = 1
- while self.go:
- prod.resumeProducing()
-
- def unregisterProducer(self):
- self.go = 0
-
- def __init__(self, postpath, session=None):
- self.sitepath = []
- self.written = []
- self.finished = 0
- self.postpath = postpath
- self.prepath = []
- self.session = None
- self.protoSession = session or server.Session(0, self)
- self.args = {}
- self.outgoingHeaders = {}
-
- def setHeader(self, name, value):
- """TODO: make this assert on write() if the header is content-length
- """
- self.outgoingHeaders[name.lower()] = value
-
- def getSession(self):
- if self.session:
- return self.session
- assert not self.written, "Session cannot be requested after data has been written."
- self.session = self.protoSession
- return self.session
- def write(self, data):
- self.written.append(data)
- def finish(self):
- self.finished = self.finished + 1
- def addArg(self, name, value):
- self.args[name] = [value]
- def setResponseCode(self, code):
- assert not self.written, "Response code cannot be set after data has been written: %s." % "@@@@".join(self.written)
- def setLastModified(self, when):
- assert not self.written, "Last-Modified cannot be set after data has been written: %s." % "@@@@".join(self.written)
- def setETag(self, tag):
- assert not self.written, "ETag cannot be set after data has been written: %s." % "@@@@".join(self.written)
-
-class ResourceTestCase(unittest.TestCase):
- def testListEntities(self):
- r = resource.Resource()
- self.failUnlessEqual([], r.listEntities())
-
-
-class SimpleResource(resource.Resource):
- def render(self, request):
- if http.CACHED in (request.setLastModified(10),
- request.setETag('MatchingTag')):
- return ''
- else:
- return "correct"
-
-class SiteTest(unittest.TestCase):
- def testSimplestSite(self):
- sres1 = SimpleResource()
- sres2 = SimpleResource()
- sres1.putChild("",sres2)
- site = server.Site(sres1)
- assert site.getResourceFor(DummyRequest([''])) is sres2, "Got the wrong resource."
-
-
-
-class SessionTest(unittest.TestCase):
-
- def setUp(self):
- """
- Set up a session using a simulated scheduler. Creates a
- C{times} attribute which specifies the return values of the
- session's C{_getTime} method.
- """
- clock = self.clock = task.Clock()
- times = self.times = []
-
- class MockSession(server.Session):
- """
- A mock L{server.Session} object which fakes out scheduling
- with the C{clock} attribute and fakes out the current time
- to be the elements of L{SessionTest}'s C{times} attribute.
- """
- def loopFactory(self, *a, **kw):
- """
- Create a L{task.LoopingCall} which uses
- L{SessionTest}'s C{clock} attribute.
- """
- call = task.LoopingCall(*a, **kw)
- call.clock = clock
- return call
-
- def _getTime(self):
- return times.pop(0)
-
- self.site = server.Site(SimpleResource())
- self.site.sessionFactory = MockSession
-
-
- def test_basicExpiration(self):
- """
- Test session expiration: setup a session, and simulate an expiration
- time.
- """
- self.times.extend([0, server.Session.sessionTimeout + 1])
- session = self.site.makeSession()
- hasExpired = [False]
- def cbExpire():
- hasExpired[0] = True
- session.notifyOnExpire(cbExpire)
- self.clock.advance(server.Site.sessionCheckTime - 1)
- # Looping call should not have been executed
- self.failIf(hasExpired[0])
-
- self.clock.advance(1)
-
- self.failUnless(hasExpired[0])
-
-
- def test_delayedCallCleanup(self):
- """
- Checking to make sure Sessions do not leave extra DelayedCalls.
- """
- self.times.extend([0, 100])
-
- session = self.site.makeSession()
- loop = session.checkExpiredLoop
- session.touch()
- self.failUnless(loop.running)
-
- session.expire()
-
- self.failIf(self.clock.calls)
- self.failIf(loop.running)
-
-
-
-# Conditional requests:
-# If-None-Match, If-Modified-Since
-
-# make conditional request:
-# normal response if condition succeeds
-# if condition fails:
-# response code
-# no body
-
-def httpBody(whole):
- return whole.split('\r\n\r\n', 1)[1]
-
-def httpHeader(whole, key):
- key = key.lower()
- headers = whole.split('\r\n\r\n', 1)[0]
- for header in headers.split('\r\n'):
- if header.lower().startswith(key):
- return header.split(':', 1)[1].strip()
- return None
-
-def httpCode(whole):
- l1 = whole.split('\r\n', 1)[0]
- return int(l1.split()[1])
-
-class ConditionalTest(unittest.TestCase):
- """web.server's handling of conditional requests for cache validation."""
-
- # XXX: test web.distrib.
-
- def setUp(self):
- self.resrc = SimpleResource()
- self.resrc.putChild('', self.resrc)
- self.site = server.Site(self.resrc)
- self.site = server.Site(self.resrc)
- self.site.logFile = log.logfile
-
- # HELLLLLLLLLLP! This harness is Very Ugly.
- self.channel = self.site.buildProtocol(None)
- self.transport = http.StringTransport()
- self.transport.close = lambda *a, **kw: None
- self.transport.disconnecting = lambda *a, **kw: 0
- self.transport.getPeer = lambda *a, **kw: "peer"
- self.transport.getHost = lambda *a, **kw: "host"
- self.channel.makeConnection(self.transport)
- for l in ["GET / HTTP/1.1",
- "Accept: text/html"]:
- self.channel.lineReceived(l)
-
- def tearDown(self):
- self.channel.connectionLost(None)
-
- def test_modified(self):
- """If-Modified-Since cache validator (positive)"""
- self.channel.lineReceived("If-Modified-Since: %s"
- % http.datetimeToString(1))
- self.channel.lineReceived('')
- result = self.transport.getvalue()
- self.failUnlessEqual(httpCode(result), http.OK)
- self.failUnlessEqual(httpBody(result), "correct")
-
- def test_unmodified(self):
- """If-Modified-Since cache validator (negative)"""
- self.channel.lineReceived("If-Modified-Since: %s"
- % http.datetimeToString(100))
- self.channel.lineReceived('')
- result = self.transport.getvalue()
- self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED)
- self.failUnlessEqual(httpBody(result), "")
-
- def test_etagMatchedNot(self):
- """If-None-Match ETag cache validator (positive)"""
- self.channel.lineReceived("If-None-Match: unmatchedTag")
- self.channel.lineReceived('')
- result = self.transport.getvalue()
- self.failUnlessEqual(httpCode(result), http.OK)
- self.failUnlessEqual(httpBody(result), "correct")
-
- def test_etagMatched(self):
- """If-None-Match ETag cache validator (negative)"""
- self.channel.lineReceived("If-None-Match: MatchingTag")
- self.channel.lineReceived('')
- result = self.transport.getvalue()
- self.failUnlessEqual(httpHeader(result, "ETag"), "MatchingTag")
- self.failUnlessEqual(httpCode(result), http.NOT_MODIFIED)
- self.failUnlessEqual(httpBody(result), "")
-
-from twisted.web import google
-class GoogleTestCase(unittest.TestCase):
- def testCheckGoogle(self):
- raise unittest.SkipTest("no violation of google ToS")
- d = google.checkGoogle('site:www.twistedmatrix.com twisted')
- d.addCallback(self.assertEquals, 'http://twistedmatrix.com/')
- return d
-
-from twisted.web import static
-from twisted.web import script
-
-class StaticFileTest(unittest.TestCase):
-
- def testStaticPaths(self):
- import os
- dp = os.path.join(self.mktemp(),"hello")
- ddp = os.path.join(dp, "goodbye")
- tp = os.path.abspath(os.path.join(dp,"world.txt"))
- tpy = os.path.join(dp,"wyrld.rpy")
- os.makedirs(dp)
- f = open(tp,"wb")
- f.write("hello world")
- f = open(tpy, "wb")
- f.write("""
-from twisted.web.static import Data
-resource = Data('dynamic world','text/plain')
-""")
- f = static.File(dp)
- f.processors = {
- '.rpy': script.ResourceScript,
- }
-
- f.indexNames = f.indexNames + ['world.txt']
- self.assertEquals(f.getChild('', DummyRequest([''])).path,
- tp)
- self.assertEquals(f.getChild('wyrld.rpy', DummyRequest(['wyrld.rpy'])
- ).__class__,
- static.Data)
- f = static.File(dp)
- wtextr = DummyRequest(['world.txt'])
- wtext = f.getChild('world.txt', wtextr)
- self.assertEquals(wtext.path, tp)
- wtext.render(wtextr)
- self.assertEquals(wtextr.outgoingHeaders.get('content-length'),
- str(len('hello world')))
- self.assertNotEquals(f.getChild('', DummyRequest([''])).__class__,
- static.File)
-
- def testIgnoreExt(self):
- f = static.File(".")
- f.ignoreExt(".foo")
- self.assertEquals(f.ignoredExts, [".foo"])
- f = static.File(".")
- self.assertEquals(f.ignoredExts, [])
- f = static.File(".", ignoredExts=(".bar", ".baz"))
- self.assertEquals(f.ignoredExts, [".bar", ".baz"])
-
- def testIgnoredExts(self):
- import os
- dp = os.path.join(self.mktemp(), 'allYourBase')
- fp = os.path.join(dp, 'AreBelong.ToUs')
- os.makedirs(dp)
- open(fp, 'wb').write("Take off every 'Zig'!!")
- f = static.File(dp)
- f.ignoreExt('.ToUs')
- dreq = DummyRequest([''])
- child_without_ext = f.getChild('AreBelong', dreq)
- self.assertNotEquals(child_without_ext, f.childNotFound)
-
-class DummyChannel:
- class TCP:
- port = 80
- def getPeer(self):
- return IPv4Address("TCP", 'client.example.com', 12344)
- def getHost(self):
- return IPv4Address("TCP", 'example.com', self.port)
- class SSL(TCP):
- implements(interfaces.ISSLTransport)
- transport = TCP()
- site = server.Site(resource.Resource())
-
-class TestRequest(unittest.TestCase):
-
- def testChildLink(self):
- request = server.Request(DummyChannel(), 1)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.childLink('baz'), 'bar/baz')
- request = server.Request(DummyChannel(), 1)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0')
- self.assertEqual(request.childLink('baz'), 'baz')
-
- def testPrePathURLSimple(self):
- request = server.Request(DummyChannel(), 1)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- request.setHost('example.com', 80)
- self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar')
-
- def testPrePathURLNonDefault(self):
- d = DummyChannel()
- d.transport = DummyChannel.TCP()
- d.transport.port = 81
- request = server.Request(d, 1)
- request.setHost('example.com', 81)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar')
-
- def testPrePathURLSSLPort(self):
- d = DummyChannel()
- d.transport = DummyChannel.TCP()
- d.transport.port = 443
- request = server.Request(d, 1)
- request.setHost('example.com', 443)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar')
-
- def testPrePathURLSSLPortAndSSL(self):
- d = DummyChannel()
- d.transport = DummyChannel.SSL()
- d.transport.port = 443
- request = server.Request(d, 1)
- request.setHost('example.com', 443)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar')
-
- def testPrePathURLHTTPPortAndSSL(self):
- d = DummyChannel()
- d.transport = DummyChannel.SSL()
- d.transport.port = 80
- request = server.Request(d, 1)
- request.setHost('example.com', 80)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'https://example.com:80/foo/bar')
-
- def testPrePathURLSSLNonDefault(self):
- d = DummyChannel()
- d.transport = DummyChannel.SSL()
- d.transport.port = 81
- request = server.Request(d, 1)
- request.setHost('example.com', 81)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar')
-
- def testPrePathURLSetSSLHost(self):
- d = DummyChannel()
- d.transport = DummyChannel.TCP()
- d.transport.port = 81
- request = server.Request(d, 1)
- request.setHost('foo.com', 81, 1)
- request.gotLength(0)
- request.requestReceived('GET', '/foo/bar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar')
-
-
- def test_prePathURLQuoting(self):
- """
- L{Request.prePathURL} quotes special characters in the URL segments to
- preserve the original meaning.
- """
- d = DummyChannel()
- request = server.Request(d, 1)
- request.setHost('example.com', 80)
- request.gotLength(0)
- request.requestReceived('GET', '/foo%2Fbar', 'HTTP/1.0')
- self.assertEqual(request.prePathURL(), 'http://example.com/foo%2Fbar')
-
-
- def testNotifyFinishConnectionLost(self):
- d = DummyChannel()
- d.transport = DummyChannel.TCP()
- request = server.Request(d, 1)
- finished = request.notifyFinish()
- request.connectionLost(error.ConnectionDone("Connection done"))
- return self.assertFailure(finished, error.ConnectionDone)
-
-
-class RootResource(resource.Resource):
- isLeaf=0
- def getChildWithDefault(self, name, request):
- request.rememberRootURL()
- return resource.Resource.getChildWithDefault(self, name, request)
- def render(self, request):
- return ''
-
-class RememberURLTest(unittest.TestCase):
- def createServer(self, r):
- chan = DummyChannel()
- chan.transport = DummyChannel.TCP()
- chan.site = server.Site(r)
- return chan
-
- def testSimple(self):
- r = resource.Resource()
- r.isLeaf=0
- rr = RootResource()
- r.putChild('foo', rr)
- rr.putChild('', rr)
- rr.putChild('bar', resource.Resource())
- chan = self.createServer(r)
- for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']:
- request = server.Request(chan, 1)
- request.setHost('example.com', 81)
- request.gotLength(0)
- request.requestReceived('GET', url, 'HTTP/1.0')
- self.assertEqual(request.getRootURL(), "http://example.com/foo")
-
- def testRoot(self):
- rr = RootResource()
- rr.putChild('', rr)
- rr.putChild('bar', resource.Resource())
- chan = self.createServer(rr)
- for url in ['/', '/bar', '/bar/baz', '/bar/']:
- request = server.Request(chan, 1)
- request.setHost('example.com', 81)
- request.gotLength(0)
- request.requestReceived('GET', url, 'HTTP/1.0')
- self.assertEqual(request.getRootURL(), "http://example.com/")
-
-
-class NewRenderResource(resource.Resource):
- def render_GET(self, request):
- return "hi hi"
-
- def render_HEH(self, request):
- return "ho ho"
-
-
-class NewRenderTestCase(unittest.TestCase):
- def _getReq(self):
- d = DummyChannel()
- d.site.resource.putChild('newrender', NewRenderResource())
- d.transport = DummyChannel.TCP()
- d.transport.port = 81
- request = server.Request(d, 1)
- request.setHost('example.com', 81)
- request.gotLength(0)
- return request
-
- def testGoodMethods(self):
- req = self._getReq()
- req.requestReceived('GET', '/newrender', 'HTTP/1.0')
- self.assertEquals(req.transport.getvalue().splitlines()[-1], 'hi hi')
-
- req = self._getReq()
- req.requestReceived('HEH', '/newrender', 'HTTP/1.0')
- self.assertEquals(req.transport.getvalue().splitlines()[-1], 'ho ho')
-
- def testBadMethods(self):
- req = self._getReq()
- req.requestReceived('CONNECT', '/newrender', 'HTTP/1.0')
- self.assertEquals(req.code, 501)
-
- req = self._getReq()
- req.requestReceived('hlalauguG', '/newrender', 'HTTP/1.0')
- self.assertEquals(req.code, 501)
-
- def testImplicitHead(self):
- req = self._getReq()
- req.requestReceived('HEAD', '/newrender', 'HTTP/1.0')
- self.assertEquals(req.code, 200)
- self.assertEquals(-1, req.transport.getvalue().find('hi hi'))
-
-
-class SDResource(resource.Resource):
- def __init__(self,default): self.default=default
- def getChildWithDefault(self,name,request):
- d=defer.succeed(self.default)
- return util.DeferredResource(d).getChildWithDefault(name, request)
-
-class SDTest(unittest.TestCase):
-
- def testDeferredResource(self):
- r = resource.Resource()
- r.isLeaf = 1
- s = SDResource(r)
- d = DummyRequest(['foo', 'bar', 'baz'])
- resource.getChildForRequest(s, d)
- self.assertEqual(d.postpath, ['bar', 'baz'])
-
-class DummyRequestForLogTest(DummyRequest):
- uri='/dummy' # parent class uri has "http://", which doesn't really happen
- code = 123
- client = '1.2.3.4'
- clientproto = 'HTTP/1.0'
- sentLength = None
-
- def __init__(self, *a, **kw):
- DummyRequest.__init__(self, *a, **kw)
- self.headers = {}
-
- def getHeader(self, h):
- return self.headers.get(h.lower(), None)
-
- def getClientIP(self):
- return self.client
-
-class TestLogEscaping(unittest.TestCase):
- def setUp(self):
- self.site = http.HTTPFactory()
- self.site.logFile = StringIO()
- self.request = DummyRequestForLogTest(self.site, False)
-
- def testSimple(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "-"\n')
-
- def testMethodQuote(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.request.method = 'G"T'
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "G\\"T /dummy HTTP/1.0" 123 - "-" "-"\n')
-
- def testRequestQuote(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.request.uri='/dummy"withquote'
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy\\"withquote HTTP/1.0" 123 - "-" "-"\n')
-
- def testProtoQuote(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.request.clientproto='HT"P/1.0'
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HT\\"P/1.0" 123 - "-" "-"\n')
-
- def testRefererQuote(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.request.headers['referer'] = 'http://malicious" ".website.invalid'
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "http://malicious\\" \\".website.invalid" "-"\n')
-
- def testUserAgentQuote(self):
- http._logDateTime = "[%02d/%3s/%4d:%02d:%02d:%02d +0000]" % (
- 25, 'Oct', 2004, 12, 31, 59)
- self.request.headers['user-agent'] = 'Malicious Web" Evil'
- self.site.log(self.request)
- self.site.logFile.seek(0)
- self.assertEqual(
- self.site.logFile.read(),
- '1.2.3.4 - - [25/Oct/2004:12:31:59 +0000] "GET /dummy HTTP/1.0" 123 - "-" "Malicious Web\\" Evil"\n')
« no previous file with comments | « third_party/twisted_8_1/twisted/web/test/test_tap.py ('k') | third_party/twisted_8_1/twisted/web/test/test_webclient.py » ('j') | no next file with comments »

Powered by Google App Engine
This is Rietveld 408576698