Index: third_party/twisted_8_1/twisted/web/test/test_http.py |
diff --git a/third_party/twisted_8_1/twisted/web/test/test_http.py b/third_party/twisted_8_1/twisted/web/test/test_http.py |
deleted file mode 100644 |
index 97caba54f4b43365247fd6a62f851024cd0c2bfd..0000000000000000000000000000000000000000 |
--- a/third_party/twisted_8_1/twisted/web/test/test_http.py |
+++ /dev/null |
@@ -1,548 +0,0 @@ |
-# Copyright (c) 2001-2007 Twisted Matrix Laboratories. |
-# See LICENSE for details. |
- |
- |
-""" |
-Test HTTP support. |
-""" |
- |
-from urlparse import urlparse, urlunsplit, clear_cache |
-import string, random, urllib, cgi |
- |
-from twisted.trial import unittest |
-from twisted.web import http |
-from twisted.protocols import loopback |
-from twisted.internet import protocol |
-from twisted.test.test_protocols import StringIOWithoutClosing |
- |
- |
-class DateTimeTest(unittest.TestCase): |
- """Test date parsing functions.""" |
- |
- def testRoundtrip(self): |
- for i in range(10000): |
- time = random.randint(0, 2000000000) |
- timestr = http.datetimeToString(time) |
- time2 = http.stringToDatetime(timestr) |
- self.assertEquals(time, time2) |
- |
- |
-class OrderedDict: |
- |
- def __init__(self, dict): |
- self.dict = dict |
- self.l = dict.keys() |
- |
- def __setitem__(self, k, v): |
- self.l.append(k) |
- self.dict[k] = v |
- |
- def __getitem__(self, k): |
- return self.dict[k] |
- |
- def items(self): |
- result = [] |
- for i in self.l: |
- result.append((i, self.dict[i])) |
- return result |
- |
- def __getattr__(self, attr): |
- return getattr(self.dict, attr) |
- |
- |
-class DummyHTTPHandler(http.Request): |
- |
- def process(self): |
- self.headers = OrderedDict(self.headers) |
- self.content.seek(0, 0) |
- data = self.content.read() |
- length = self.getHeader('content-length') |
- request = "'''\n"+str(length)+"\n"+data+"'''\n" |
- self.setResponseCode(200) |
- self.setHeader("Request", self.uri) |
- self.setHeader("Command", self.method) |
- self.setHeader("Version", self.clientproto) |
- self.setHeader("Content-Length", len(request)) |
- self.write(request) |
- self.finish() |
- |
- |
-class LoopbackHTTPClient(http.HTTPClient): |
- |
- def connectionMade(self): |
- self.sendCommand("GET", "/foo/bar") |
- self.sendHeader("Content-Length", 10) |
- self.endHeaders() |
- self.transport.write("0123456789") |
- |
- |
-class HTTP1_0TestCase(unittest.TestCase): |
- |
- requests = '''\ |
-GET / HTTP/1.0 |
- |
-GET / HTTP/1.1 |
-Accept: text/html |
- |
-''' |
- requests = string.replace(requests, '\n', '\r\n') |
- |
- expected_response = "HTTP/1.0 200 OK\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.0\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012" |
- |
- def test_buffer(self): |
- """ |
- Send requests over a channel and check responses match what is expected. |
- """ |
- b = StringIOWithoutClosing() |
- a = http.HTTPChannel() |
- a.requestFactory = DummyHTTPHandler |
- a.makeConnection(protocol.FileWrapper(b)) |
- # one byte at a time, to stress it. |
- for byte in self.requests: |
- a.dataReceived(byte) |
- a.connectionLost(IOError("all one")) |
- value = b.getvalue() |
- self.assertEquals(value, self.expected_response) |
- |
- |
-class HTTP1_1TestCase(HTTP1_0TestCase): |
- |
- requests = '''\ |
-GET / HTTP/1.1 |
-Accept: text/html |
- |
-POST / HTTP/1.1 |
-Content-Length: 10 |
- |
-0123456789POST / HTTP/1.1 |
-Content-Length: 10 |
- |
-0123456789HEAD / HTTP/1.1 |
- |
-''' |
- requests = string.replace(requests, '\n', '\r\n') |
- |
- expected_response = "HTTP/1.1 200 OK\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: POST\015\012Version: HTTP/1.1\015\012Content-length: 21\015\012\015\012'''\01210\0120123456789'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: POST\015\012Version: HTTP/1.1\015\012Content-length: 21\015\012\015\012'''\01210\0120123456789'''\012HTTP/1.1 200 OK\015\012Request: /\015\012Command: HEAD\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012" |
- |
-class HTTP1_1_close_TestCase(HTTP1_0TestCase): |
- |
- requests = '''\ |
-GET / HTTP/1.1 |
-Accept: text/html |
-Connection: close |
- |
-GET / HTTP/1.0 |
- |
-''' |
- |
- requests = string.replace(requests, '\n', '\r\n') |
- |
- expected_response = "HTTP/1.1 200 OK\015\012Connection: close\015\012Request: /\015\012Command: GET\015\012Version: HTTP/1.1\015\012Content-length: 13\015\012\015\012'''\012None\012'''\012" |
- |
- |
-class HTTP0_9TestCase(HTTP1_0TestCase): |
- |
- requests = '''\ |
-GET / |
-''' |
- requests = string.replace(requests, '\n', '\r\n') |
- |
- expected_response = "HTTP/1.1 400 Bad Request\r\n\r\n" |
- |
- |
-class HTTPLoopbackTestCase(unittest.TestCase): |
- |
- expectedHeaders = {'request' : '/foo/bar', |
- 'command' : 'GET', |
- 'version' : 'HTTP/1.0', |
- 'content-length' : '21'} |
- numHeaders = 0 |
- gotStatus = 0 |
- gotResponse = 0 |
- gotEndHeaders = 0 |
- |
- def _handleStatus(self, version, status, message): |
- self.gotStatus = 1 |
- self.assertEquals(version, "HTTP/1.0") |
- self.assertEquals(status, "200") |
- |
- def _handleResponse(self, data): |
- self.gotResponse = 1 |
- self.assertEquals(data, "'''\n10\n0123456789'''\n") |
- |
- def _handleHeader(self, key, value): |
- self.numHeaders = self.numHeaders + 1 |
- self.assertEquals(self.expectedHeaders[string.lower(key)], value) |
- |
- def _handleEndHeaders(self): |
- self.gotEndHeaders = 1 |
- self.assertEquals(self.numHeaders, 4) |
- |
- def testLoopback(self): |
- server = http.HTTPChannel() |
- server.requestFactory = DummyHTTPHandler |
- client = LoopbackHTTPClient() |
- client.handleResponse = self._handleResponse |
- client.handleHeader = self._handleHeader |
- client.handleEndHeaders = self._handleEndHeaders |
- client.handleStatus = self._handleStatus |
- d = loopback.loopbackAsync(server, client) |
- d.addCallback(self._cbTestLoopback) |
- return d |
- |
- def _cbTestLoopback(self, ignored): |
- if not (self.gotStatus and self.gotResponse and self.gotEndHeaders): |
- raise RuntimeError( |
- "didn't got all callbacks %s" |
- % [self.gotStatus, self.gotResponse, self.gotEndHeaders]) |
- del self.gotEndHeaders |
- del self.gotResponse |
- del self.gotStatus |
- del self.numHeaders |
- |
- |
-class PRequest: |
- """Dummy request for persistence tests.""" |
- |
- def __init__(self, **headers): |
- self.received_headers = headers |
- self.headers = {} |
- |
- def getHeader(self, k): |
- return self.received_headers.get(k, '') |
- |
- def setHeader(self, k, v): |
- self.headers[k] = v |
- |
- |
-class PersistenceTestCase(unittest.TestCase): |
- """Tests for persistent HTTP connections.""" |
- |
- ptests = [#(PRequest(connection="Keep-Alive"), "HTTP/1.0", 1, {'connection' : 'Keep-Alive'}), |
- (PRequest(), "HTTP/1.0", 0, {'connection': None}), |
- (PRequest(connection="close"), "HTTP/1.1", 0, {'connection' : 'close'}), |
- (PRequest(), "HTTP/1.1", 1, {'connection': None}), |
- (PRequest(), "HTTP/0.9", 0, {'connection': None}), |
- ] |
- |
- |
- def testAlgorithm(self): |
- c = http.HTTPChannel() |
- for req, version, correctResult, resultHeaders in self.ptests: |
- result = c.checkPersistence(req, version) |
- self.assertEquals(result, correctResult) |
- for header in resultHeaders.keys(): |
- self.assertEquals(req.headers.get(header, None), resultHeaders[header]) |
- |
- |
-class ChunkingTestCase(unittest.TestCase): |
- |
- strings = ["abcv", "", "fdfsd423", "Ffasfas\r\n", |
- "523523\n\rfsdf", "4234"] |
- |
- def testChunks(self): |
- for s in self.strings: |
- self.assertEquals((s, ''), http.fromChunk(''.join(http.toChunk(s)))) |
- self.assertRaises(ValueError, http.fromChunk, '-5\r\nmalformed!\r\n') |
- |
- def testConcatenatedChunks(self): |
- chunked = ''.join([''.join(http.toChunk(t)) for t in self.strings]) |
- result = [] |
- buffer = "" |
- for c in chunked: |
- buffer = buffer + c |
- try: |
- data, buffer = http.fromChunk(buffer) |
- result.append(data) |
- except ValueError: |
- pass |
- self.assertEquals(result, self.strings) |
- |
- |
- |
-class ParsingTestCase(unittest.TestCase): |
- |
- def runRequest(self, httpRequest, requestClass, success=1): |
- httpRequest = httpRequest.replace("\n", "\r\n") |
- b = StringIOWithoutClosing() |
- a = http.HTTPChannel() |
- a.requestFactory = requestClass |
- a.makeConnection(protocol.FileWrapper(b)) |
- # one byte at a time, to stress it. |
- for byte in httpRequest: |
- if a.transport.closed: |
- break |
- a.dataReceived(byte) |
- a.connectionLost(IOError("all done")) |
- if success: |
- self.assertEquals(self.didRequest, 1) |
- del self.didRequest |
- else: |
- self.assert_(not hasattr(self, "didRequest")) |
- |
- def testBasicAuth(self): |
- testcase = self |
- class Request(http.Request): |
- l = [] |
- def process(self): |
- testcase.assertEquals(self.getUser(), self.l[0]) |
- testcase.assertEquals(self.getPassword(), self.l[1]) |
- for u, p in [("foo", "bar"), ("hello", "there:z")]: |
- Request.l[:] = [u, p] |
- s = "%s:%s" % (u, p) |
- f = "GET / HTTP/1.0\nAuthorization: Basic %s\n\n" % (s.encode("base64").strip(), ) |
- self.runRequest(f, Request, 0) |
- |
- def testTooManyHeaders(self): |
- httpRequest = "GET / HTTP/1.0\n" |
- for i in range(502): |
- httpRequest += "%s: foo\n" % i |
- httpRequest += "\n" |
- class MyRequest(http.Request): |
- def process(self): |
- raise RuntimeError, "should not get called" |
- self.runRequest(httpRequest, MyRequest, 0) |
- |
- def testHeaders(self): |
- httpRequest = """\ |
-GET / HTTP/1.0 |
-Foo: bar |
-baz: 1 2 3 |
- |
-""" |
- testcase = self |
- |
- class MyRequest(http.Request): |
- def process(self): |
- testcase.assertEquals(self.getHeader('foo'), 'bar') |
- testcase.assertEquals(self.getHeader('Foo'), 'bar') |
- testcase.assertEquals(self.getHeader('bAz'), '1 2 3') |
- testcase.didRequest = 1 |
- self.finish() |
- |
- self.runRequest(httpRequest, MyRequest) |
- |
- def testCookies(self): |
- """ |
- Test cookies parsing and reading. |
- """ |
- httpRequest = '''\ |
-GET / HTTP/1.0 |
-Cookie: rabbit="eat carrot"; ninja=secret; spam="hey 1=1!" |
- |
-''' |
- testcase = self |
- |
- class MyRequest(http.Request): |
- def process(self): |
- testcase.assertEquals(self.getCookie('rabbit'), '"eat carrot"') |
- testcase.assertEquals(self.getCookie('ninja'), 'secret') |
- testcase.assertEquals(self.getCookie('spam'), '"hey 1=1!"') |
- testcase.didRequest = 1 |
- self.finish() |
- |
- self.runRequest(httpRequest, MyRequest) |
- |
- def testGET(self): |
- httpRequest = '''\ |
-GET /?key=value&multiple=two+words&multiple=more%20words&empty= HTTP/1.0 |
- |
-''' |
- testcase = self |
- class MyRequest(http.Request): |
- def process(self): |
- testcase.assertEquals(self.method, "GET") |
- testcase.assertEquals(self.args["key"], ["value"]) |
- testcase.assertEquals(self.args["empty"], [""]) |
- testcase.assertEquals(self.args["multiple"], ["two words", "more words"]) |
- testcase.didRequest = 1 |
- self.finish() |
- |
- self.runRequest(httpRequest, MyRequest) |
- |
- |
- def test_extraQuestionMark(self): |
- """ |
- While only a single '?' is allowed in an URL, several other servers |
- allow several and pass all after the first through as part of the |
- query arguments. Test that we emulate this behavior. |
- """ |
- httpRequest = 'GET /foo?bar=?&baz=quux HTTP/1.0\n\n' |
- |
- testcase = self |
- class MyRequest(http.Request): |
- def process(self): |
- testcase.assertEqual(self.method, 'GET') |
- testcase.assertEqual(self.path, '/foo') |
- testcase.assertEqual(self.args['bar'], ['?']) |
- testcase.assertEqual(self.args['baz'], ['quux']) |
- testcase.didRequest = 1 |
- self.finish() |
- |
- self.runRequest(httpRequest, MyRequest) |
- |
- |
- def testPOST(self): |
- query = 'key=value&multiple=two+words&multiple=more%20words&empty=' |
- httpRequest = '''\ |
-POST / HTTP/1.0 |
-Content-Length: %d |
-Content-Type: application/x-www-form-urlencoded |
- |
-%s''' % (len(query), query) |
- |
- testcase = self |
- class MyRequest(http.Request): |
- def process(self): |
- testcase.assertEquals(self.method, "POST") |
- testcase.assertEquals(self.args["key"], ["value"]) |
- testcase.assertEquals(self.args["empty"], [""]) |
- testcase.assertEquals(self.args["multiple"], ["two words", "more words"]) |
- testcase.didRequest = 1 |
- self.finish() |
- |
- self.runRequest(httpRequest, MyRequest) |
- |
- def testMissingContentDisposition(self): |
- req = '''\ |
-POST / HTTP/1.0 |
-Content-Type: multipart/form-data; boundary=AaB03x |
-Content-Length: 103 |
- |
---AaB03x |
-Content-Type: text/plain |
-Content-Transfer-Encoding: quoted-printable |
- |
-abasdfg |
---AaB03x-- |
-''' |
- self.runRequest(req, http.Request, success=False) |
- |
-class QueryArgumentsTestCase(unittest.TestCase): |
- def testUnquote(self): |
- try: |
- from twisted.protocols import _c_urlarg |
- except ImportError: |
- raise unittest.SkipTest("_c_urlarg module is not available") |
- # work exactly like urllib.unquote, including stupid things |
- # % followed by a non-hexdigit in the middle and in the end |
- self.failUnlessEqual(urllib.unquote("%notreally%n"), |
- _c_urlarg.unquote("%notreally%n")) |
- # % followed by hexdigit, followed by non-hexdigit |
- self.failUnlessEqual(urllib.unquote("%1quite%1"), |
- _c_urlarg.unquote("%1quite%1")) |
- # unquoted text, followed by some quoted chars, ends in a trailing % |
- self.failUnlessEqual(urllib.unquote("blah%21%40%23blah%"), |
- _c_urlarg.unquote("blah%21%40%23blah%")) |
- # Empty string |
- self.failUnlessEqual(urllib.unquote(""), _c_urlarg.unquote("")) |
- |
- def testParseqs(self): |
- self.failUnlessEqual(cgi.parse_qs("a=b&d=c;+=f"), |
- http.parse_qs("a=b&d=c;+=f")) |
- self.failUnlessRaises(ValueError, http.parse_qs, "blah", |
- strict_parsing = 1) |
- self.failUnlessEqual(cgi.parse_qs("a=&b=c", keep_blank_values = 1), |
- http.parse_qs("a=&b=c", keep_blank_values = 1)) |
- self.failUnlessEqual(cgi.parse_qs("a=&b=c"), |
- http.parse_qs("a=&b=c")) |
- |
- |
- def test_urlparse(self): |
- """ |
- For a given URL, L{http.urlparse} should behave the same as |
- L{urlparse}, except it should always return C{str}, never C{unicode}. |
- """ |
- def urls(): |
- for scheme in ('http', 'https'): |
- for host in ('example.com',): |
- for port in (None, 100): |
- for path in ('', 'path'): |
- if port is not None: |
- host = host + ':' + str(port) |
- yield urlunsplit((scheme, host, path, '', '')) |
- |
- |
- def assertSameParsing(url, decode): |
- """ |
- Verify that C{url} is parsed into the same objects by both |
- L{http.urlparse} and L{urlparse}. |
- """ |
- urlToStandardImplementation = url |
- if decode: |
- urlToStandardImplementation = url.decode('ascii') |
- standardResult = urlparse(urlToStandardImplementation) |
- scheme, netloc, path, params, query, fragment = http.urlparse(url) |
- self.assertEqual( |
- (scheme, netloc, path, params, query, fragment), |
- standardResult) |
- self.assertTrue(isinstance(scheme, str)) |
- self.assertTrue(isinstance(netloc, str)) |
- self.assertTrue(isinstance(path, str)) |
- self.assertTrue(isinstance(params, str)) |
- self.assertTrue(isinstance(query, str)) |
- self.assertTrue(isinstance(fragment, str)) |
- |
- # With caching, unicode then str |
- clear_cache() |
- for url in urls(): |
- assertSameParsing(url, True) |
- assertSameParsing(url, False) |
- |
- # With caching, str then unicode |
- clear_cache() |
- for url in urls(): |
- assertSameParsing(url, False) |
- assertSameParsing(url, True) |
- |
- # Without caching |
- for url in urls(): |
- clear_cache() |
- assertSameParsing(url, True) |
- clear_cache() |
- assertSameParsing(url, False) |
- |
- |
- def test_urlparseRejectsUnicode(self): |
- """ |
- L{http.urlparse} should reject unicode input early. |
- """ |
- self.assertRaises(TypeError, http.urlparse, u'http://example.org/path') |
- |
- |
- def testEscchar(self): |
- try: |
- from twisted.protocols import _c_urlarg |
- except ImportError: |
- raise unittest.SkipTest("_c_urlarg module is not available") |
- self.failUnlessEqual("!@#+b", |
- _c_urlarg.unquote("+21+40+23+b", "+")) |
- |
-class ClientDriver(http.HTTPClient): |
- def handleStatus(self, version, status, message): |
- self.version = version |
- self.status = status |
- self.message = message |
- |
-class ClientStatusParsing(unittest.TestCase): |
- def testBaseline(self): |
- c = ClientDriver() |
- c.lineReceived('HTTP/1.0 201 foo') |
- self.failUnlessEqual(c.version, 'HTTP/1.0') |
- self.failUnlessEqual(c.status, '201') |
- self.failUnlessEqual(c.message, 'foo') |
- |
- def testNoMessage(self): |
- c = ClientDriver() |
- c.lineReceived('HTTP/1.0 201') |
- self.failUnlessEqual(c.version, 'HTTP/1.0') |
- self.failUnlessEqual(c.status, '201') |
- self.failUnlessEqual(c.message, '') |
- |
- def testNoMessage_trailingSpace(self): |
- c = ClientDriver() |
- c.lineReceived('HTTP/1.0 201 ') |
- self.failUnlessEqual(c.version, 'HTTP/1.0') |
- self.failUnlessEqual(c.status, '201') |
- self.failUnlessEqual(c.message, '') |
- |