| Index: third_party/twisted_8_1/twisted/web/test/test_webclient.py
|
| diff --git a/third_party/twisted_8_1/twisted/web/test/test_webclient.py b/third_party/twisted_8_1/twisted/web/test/test_webclient.py
|
| deleted file mode 100644
|
| index a1bab7899ccc667f1eb45c809acaa9213e4558a7..0000000000000000000000000000000000000000
|
| --- a/third_party/twisted_8_1/twisted/web/test/test_webclient.py
|
| +++ /dev/null
|
| @@ -1,475 +0,0 @@
|
| -# Copyright (c) 2001-2008 Twisted Matrix Laboratories.
|
| -# See LICENSE for details.
|
| -
|
| -"""
|
| -Tests for L{twisted.web.client}.
|
| -"""
|
| -
|
| -import os
|
| -
|
| -from urlparse import urlparse
|
| -
|
| -from twisted.trial import unittest
|
| -from twisted.web import server, static, client, error, util, resource
|
| -from twisted.internet import reactor, defer, interfaces
|
| -from twisted.python.filepath import FilePath
|
| -
|
| -try:
|
| - from twisted.internet import ssl
|
| -except:
|
| - ssl = None
|
| -
|
| -serverCallID = None
|
| -
|
| -class LongTimeTakingResource(resource.Resource):
|
| - def render(self, request):
|
| - global serverCallID
|
| - serverCallID = reactor.callLater(1, self.writeIt, request)
|
| - return server.NOT_DONE_YET
|
| -
|
| - def writeIt(self, request):
|
| - request.write("hello!!!")
|
| - request.finish()
|
| -
|
| -class CookieMirrorResource(resource.Resource):
|
| - def render(self, request):
|
| - l = []
|
| - for k,v in request.received_cookies.items():
|
| - l.append((k, v))
|
| - l.sort()
|
| - return repr(l)
|
| -
|
| -class RawCookieMirrorResource(resource.Resource):
|
| - def render(self, request):
|
| - return repr(request.getHeader('cookie'))
|
| -
|
| -class ErrorResource(resource.Resource):
|
| -
|
| - def render(self, request):
|
| - request.setResponseCode(401)
|
| - if request.args.get("showlength"):
|
| - request.setHeader("content-length", "0")
|
| - return ""
|
| -
|
| -class NoLengthResource(resource.Resource):
|
| -
|
| - def render(self, request):
|
| - return "nolength"
|
| -
|
| -class HostHeaderResource(resource.Resource):
|
| -
|
| - def render(self, request):
|
| - return request.received_headers["host"]
|
| -
|
| -class PayloadResource(resource.Resource):
|
| -
|
| - def render(self, request):
|
| - data = request.content.read()
|
| - if len(data) != 100 or int(request.received_headers["content-length"]) != 100:
|
| - return "ERROR"
|
| - return data
|
| -
|
| -class BrokenDownloadResource(resource.Resource):
|
| -
|
| - def render(self, request):
|
| - # only sends 3 bytes even though it claims to send 5
|
| - request.setHeader("content-length", "5")
|
| - request.write('abc')
|
| - return ''
|
| -
|
| -
|
| -
|
| -class ParseUrlTestCase(unittest.TestCase):
|
| - """
|
| - Test URL parsing facility and defaults values.
|
| - """
|
| -
|
| - def testParse(self):
|
| - scheme, host, port, path = client._parse("http://127.0.0.1/")
|
| - self.assertEquals(path, "/")
|
| - self.assertEquals(port, 80)
|
| - scheme, host, port, path = client._parse("https://127.0.0.1/")
|
| - self.assertEquals(path, "/")
|
| - self.assertEquals(port, 443)
|
| - scheme, host, port, path = client._parse("http://spam:12345/")
|
| - self.assertEquals(port, 12345)
|
| - scheme, host, port, path = client._parse("http://foo ")
|
| - self.assertEquals(host, "foo")
|
| - self.assertEquals(path, "/")
|
| - scheme, host, port, path = client._parse("http://egg:7890")
|
| - self.assertEquals(port, 7890)
|
| - self.assertEquals(host, "egg")
|
| - self.assertEquals(path, "/")
|
| -
|
| -
|
| - def test_externalUnicodeInterference(self):
|
| - """
|
| - L{client._parse} should return C{str} for the scheme, host, and path
|
| - elements of its return tuple, even when passed an URL which has
|
| - previously been passed to L{urlparse} as a C{unicode} string.
|
| - """
|
| - badInput = u'http://example.com/path'
|
| - goodInput = badInput.encode('ascii')
|
| - urlparse(badInput)
|
| - scheme, host, port, path = client._parse(goodInput)
|
| - self.assertTrue(isinstance(scheme, str))
|
| - self.assertTrue(isinstance(host, str))
|
| - self.assertTrue(isinstance(path, str))
|
| -
|
| -
|
| -
|
| -class WebClientTestCase(unittest.TestCase):
|
| - def _listen(self, site):
|
| - return reactor.listenTCP(0, site, interface="127.0.0.1")
|
| -
|
| - def setUp(self):
|
| - name = self.mktemp()
|
| - os.mkdir(name)
|
| - FilePath(name).child("file").setContent("0123456789")
|
| - r = static.File(name)
|
| - r.putChild("redirect", util.Redirect("/file"))
|
| - r.putChild("wait", LongTimeTakingResource())
|
| - r.putChild("error", ErrorResource())
|
| - r.putChild("nolength", NoLengthResource())
|
| - r.putChild("host", HostHeaderResource())
|
| - r.putChild("payload", PayloadResource())
|
| - r.putChild("broken", BrokenDownloadResource())
|
| - site = server.Site(r, timeout=None)
|
| - self.port = self._listen(site)
|
| - self.portno = self.port.getHost().port
|
| -
|
| - def tearDown(self):
|
| - if serverCallID and serverCallID.active():
|
| - serverCallID.cancel()
|
| - return self.port.stopListening()
|
| -
|
| - def getURL(self, path):
|
| - return "http://127.0.0.1:%d/%s" % (self.portno, path)
|
| -
|
| - def testPayload(self):
|
| - s = "0123456789" * 10
|
| - return client.getPage(self.getURL("payload"), postdata=s
|
| - ).addCallback(self.assertEquals, s
|
| - )
|
| -
|
| - def testBrokenDownload(self):
|
| - # test what happens when download gets disconnected in the middle
|
| - d = client.getPage(self.getURL("broken"))
|
| - d = self.assertFailure(d, client.PartialDownloadError)
|
| - d.addCallback(lambda exc: self.assertEquals(exc.response, "abc"))
|
| - return d
|
| -
|
| - def testHostHeader(self):
|
| - # if we pass Host header explicitly, it should be used, otherwise
|
| - # it should extract from url
|
| - return defer.gatherResults([
|
| - client.getPage(self.getURL("host")).addCallback(self.assertEquals, "127.0.0.1"),
|
| - client.getPage(self.getURL("host"), headers={"Host": "www.example.com"}).addCallback(self.assertEquals, "www.example.com")])
|
| -
|
| -
|
| - def test_getPage(self):
|
| - """
|
| - L{client.getPage} returns a L{Deferred} which is called back with
|
| - the body of the response if the default method B{GET} is used.
|
| - """
|
| - d = client.getPage(self.getURL("file"))
|
| - d.addCallback(self.assertEquals, "0123456789")
|
| - return d
|
| -
|
| -
|
| - def test_getPageHead(self):
|
| - """
|
| - L{client.getPage} returns a L{Deferred} which is called back with
|
| - the empty string if the method is C{HEAD} and there is a successful
|
| - response code.
|
| - """
|
| - def getPage(method):
|
| - return client.getPage(self.getURL("file"), method=method)
|
| - return defer.gatherResults([
|
| - getPage("head").addCallback(self.assertEqual, ""),
|
| - getPage("HEAD").addCallback(self.assertEqual, "")])
|
| -
|
| -
|
| - def testTimeoutNotTriggering(self):
|
| - # Test that when the timeout doesn't trigger, things work as expected.
|
| - d = client.getPage(self.getURL("wait"), timeout=100)
|
| - d.addCallback(self.assertEquals, "hello!!!")
|
| - return d
|
| -
|
| - def testTimeoutTriggering(self):
|
| - # Test that when the timeout does trigger, we get a defer.TimeoutError.
|
| - return self.assertFailure(
|
| - client.getPage(self.getURL("wait"), timeout=0.5),
|
| - defer.TimeoutError)
|
| -
|
| - def testDownloadPage(self):
|
| - downloads = []
|
| - downloadData = [("file", self.mktemp(), "0123456789"),
|
| - ("nolength", self.mktemp(), "nolength")]
|
| -
|
| - for (url, name, data) in downloadData:
|
| - d = client.downloadPage(self.getURL(url), name)
|
| - d.addCallback(self._cbDownloadPageTest, data, name)
|
| - downloads.append(d)
|
| - return defer.gatherResults(downloads)
|
| -
|
| - def _cbDownloadPageTest(self, ignored, data, name):
|
| - bytes = file(name, "rb").read()
|
| - self.assertEquals(bytes, data)
|
| -
|
| - def testDownloadPageError1(self):
|
| - class errorfile:
|
| - def write(self, data):
|
| - raise IOError, "badness happened during write"
|
| - def close(self):
|
| - pass
|
| - ef = errorfile()
|
| - return self.assertFailure(
|
| - client.downloadPage(self.getURL("file"), ef),
|
| - IOError)
|
| -
|
| - def testDownloadPageError2(self):
|
| - class errorfile:
|
| - def write(self, data):
|
| - pass
|
| - def close(self):
|
| - raise IOError, "badness happened during close"
|
| - ef = errorfile()
|
| - return self.assertFailure(
|
| - client.downloadPage(self.getURL("file"), ef),
|
| - IOError)
|
| -
|
| - def testDownloadPageError3(self):
|
| - # make sure failures in open() are caught too. This is tricky.
|
| - # Might only work on posix.
|
| - tmpfile = open("unwritable", "wb")
|
| - tmpfile.close()
|
| - os.chmod("unwritable", 0) # make it unwritable (to us)
|
| - d = self.assertFailure(
|
| - client.downloadPage(self.getURL("file"), "unwritable"),
|
| - IOError)
|
| - d.addBoth(self._cleanupDownloadPageError3)
|
| - return d
|
| -
|
| - def _cleanupDownloadPageError3(self, ignored):
|
| - os.chmod("unwritable", 0700)
|
| - os.unlink("unwritable")
|
| - return ignored
|
| -
|
| - def _downloadTest(self, method):
|
| - dl = []
|
| - for (url, code) in [("nosuchfile", "404"), ("error", "401"),
|
| - ("error?showlength=1", "401")]:
|
| - d = method(url)
|
| - d = self.assertFailure(d, error.Error)
|
| - d.addCallback(lambda exc, code=code: self.assertEquals(exc.args[0], code))
|
| - dl.append(d)
|
| - return defer.DeferredList(dl, fireOnOneErrback=True)
|
| -
|
| - def testServerError(self):
|
| - return self._downloadTest(lambda url: client.getPage(self.getURL(url)))
|
| -
|
| - def testDownloadServerError(self):
|
| - return self._downloadTest(lambda url: client.downloadPage(self.getURL(url), url.split('?')[0]))
|
| -
|
| - def testFactoryInfo(self):
|
| - url = self.getURL('file')
|
| - scheme, host, port, path = client._parse(url)
|
| - factory = client.HTTPClientFactory(url)
|
| - reactor.connectTCP(host, port, factory)
|
| - return factory.deferred.addCallback(self._cbFactoryInfo, factory)
|
| -
|
| - def _cbFactoryInfo(self, ignoredResult, factory):
|
| - self.assertEquals(factory.status, '200')
|
| - self.assert_(factory.version.startswith('HTTP/'))
|
| - self.assertEquals(factory.message, 'OK')
|
| - self.assertEquals(factory.response_headers['content-length'][0], '10')
|
| -
|
| -
|
| - def testRedirect(self):
|
| - return client.getPage(self.getURL("redirect")).addCallback(self._cbRedirect)
|
| -
|
| - def _cbRedirect(self, pageData):
|
| - self.assertEquals(pageData, "0123456789")
|
| - d = self.assertFailure(
|
| - client.getPage(self.getURL("redirect"), followRedirect=0),
|
| - error.PageRedirect)
|
| - d.addCallback(self._cbCheckLocation)
|
| - return d
|
| -
|
| - def _cbCheckLocation(self, exc):
|
| - self.assertEquals(exc.location, "/file")
|
| -
|
| - def testPartial(self):
|
| - name = self.mktemp()
|
| - f = open(name, "wb")
|
| - f.write("abcd")
|
| - f.close()
|
| -
|
| - downloads = []
|
| - partialDownload = [(True, "abcd456789"),
|
| - (True, "abcd456789"),
|
| - (False, "0123456789")]
|
| -
|
| - d = defer.succeed(None)
|
| - for (partial, expectedData) in partialDownload:
|
| - d.addCallback(self._cbRunPartial, name, partial)
|
| - d.addCallback(self._cbPartialTest, expectedData, name)
|
| -
|
| - return d
|
| -
|
| - testPartial.skip = "Cannot test until webserver can serve partial data properly"
|
| -
|
| - def _cbRunPartial(self, ignored, name, partial):
|
| - return client.downloadPage(self.getURL("file"), name, supportPartial=partial)
|
| -
|
| - def _cbPartialTest(self, ignored, expectedData, filename):
|
| - bytes = file(filename, "rb").read()
|
| - self.assertEquals(bytes, expectedData)
|
| -
|
| -class WebClientSSLTestCase(WebClientTestCase):
|
| - def _listen(self, site):
|
| - from twisted import test
|
| - return reactor.listenSSL(0, site,
|
| - contextFactory=ssl.DefaultOpenSSLContextFactory(
|
| - FilePath(test.__file__).sibling('server.pem').path,
|
| - FilePath(test.__file__).sibling('server.pem').path,
|
| - ),
|
| - interface="127.0.0.1")
|
| -
|
| - def getURL(self, path):
|
| - return "https://127.0.0.1:%d/%s" % (self.portno, path)
|
| -
|
| - def testFactoryInfo(self):
|
| - url = self.getURL('file')
|
| - scheme, host, port, path = client._parse(url)
|
| - factory = client.HTTPClientFactory(url)
|
| - reactor.connectSSL(host, port, factory, ssl.ClientContextFactory())
|
| - # The base class defines _cbFactoryInfo correctly for this
|
| - return factory.deferred.addCallback(self._cbFactoryInfo, factory)
|
| -
|
| -class WebClientRedirectBetweenSSLandPlainText(unittest.TestCase):
|
| - def getHTTPS(self, path):
|
| - return "https://127.0.0.1:%d/%s" % (self.tlsPortno, path)
|
| -
|
| - def getHTTP(self, path):
|
| - return "http://127.0.0.1:%d/%s" % (self.plainPortno, path)
|
| -
|
| - def setUp(self):
|
| - plainRoot = static.Data('not me', 'text/plain')
|
| - tlsRoot = static.Data('me neither', 'text/plain')
|
| -
|
| - plainSite = server.Site(plainRoot, timeout=None)
|
| - tlsSite = server.Site(tlsRoot, timeout=None)
|
| -
|
| - from twisted import test
|
| - self.tlsPort = reactor.listenSSL(0, tlsSite,
|
| - contextFactory=ssl.DefaultOpenSSLContextFactory(
|
| - FilePath(test.__file__).sibling('server.pem').path,
|
| - FilePath(test.__file__).sibling('server.pem').path,
|
| - ),
|
| - interface="127.0.0.1")
|
| - self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1")
|
| -
|
| - self.plainPortno = self.plainPort.getHost().port
|
| - self.tlsPortno = self.tlsPort.getHost().port
|
| -
|
| - plainRoot.putChild('one', util.Redirect(self.getHTTPS('two')))
|
| - tlsRoot.putChild('two', util.Redirect(self.getHTTP('three')))
|
| - plainRoot.putChild('three', util.Redirect(self.getHTTPS('four')))
|
| - tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
|
| -
|
| - def tearDown(self):
|
| - ds = map(defer.maybeDeferred,
|
| - [self.plainPort.stopListening, self.tlsPort.stopListening])
|
| - return defer.gatherResults(ds)
|
| -
|
| - def testHoppingAround(self):
|
| - return client.getPage(self.getHTTP("one")
|
| - ).addCallback(self.assertEquals, "FOUND IT!"
|
| - )
|
| -
|
| -class FakeTransport:
|
| - disconnecting = False
|
| - def __init__(self):
|
| - self.data = []
|
| - def write(self, stuff):
|
| - self.data.append(stuff)
|
| -
|
| -class CookieTestCase(unittest.TestCase):
|
| - def _listen(self, site):
|
| - return reactor.listenTCP(0, site, interface="127.0.0.1")
|
| -
|
| - def setUp(self):
|
| - root = static.Data('El toro!', 'text/plain')
|
| - root.putChild("cookiemirror", CookieMirrorResource())
|
| - root.putChild("rawcookiemirror", RawCookieMirrorResource())
|
| - site = server.Site(root, timeout=None)
|
| - self.port = self._listen(site)
|
| - self.portno = self.port.getHost().port
|
| -
|
| - def tearDown(self):
|
| - return self.port.stopListening()
|
| -
|
| - def getHTTP(self, path):
|
| - return "http://127.0.0.1:%d/%s" % (self.portno, path)
|
| -
|
| - def testNoCookies(self):
|
| - return client.getPage(self.getHTTP("cookiemirror")
|
| - ).addCallback(self.assertEquals, "[]"
|
| - )
|
| -
|
| - def testSomeCookies(self):
|
| - cookies = {'foo': 'bar', 'baz': 'quux'}
|
| - return client.getPage(self.getHTTP("cookiemirror"), cookies=cookies
|
| - ).addCallback(self.assertEquals, "[('baz', 'quux'), ('foo', 'bar')]"
|
| - )
|
| -
|
| - def testRawNoCookies(self):
|
| - return client.getPage(self.getHTTP("rawcookiemirror")
|
| - ).addCallback(self.assertEquals, "None"
|
| - )
|
| -
|
| - def testRawSomeCookies(self):
|
| - cookies = {'foo': 'bar', 'baz': 'quux'}
|
| - return client.getPage(self.getHTTP("rawcookiemirror"), cookies=cookies
|
| - ).addCallback(self.assertEquals, "'foo=bar; baz=quux'"
|
| - )
|
| -
|
| - def testCookieHeaderParsing(self):
|
| - d = defer.Deferred()
|
| - factory = client.HTTPClientFactory('http://foo.example.com/')
|
| - proto = factory.buildProtocol('127.42.42.42')
|
| - proto.transport = FakeTransport()
|
| - proto.connectionMade()
|
| - for line in [
|
| - '200 Ok',
|
| - 'Squash: yes',
|
| - 'Hands: stolen',
|
| - 'Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/; expires=Wednesday, 09-Nov-99 23:12:40 GMT',
|
| - 'Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/',
|
| - 'Set-Cookie: SHIPPING=FEDEX; path=/foo',
|
| - '',
|
| - 'body',
|
| - 'more body',
|
| - ]:
|
| - proto.dataReceived(line + '\r\n')
|
| - self.assertEquals(proto.transport.data,
|
| - ['GET / HTTP/1.0\r\n',
|
| - 'Host: foo.example.com\r\n',
|
| - 'User-Agent: Twisted PageGetter\r\n',
|
| - '\r\n'])
|
| - self.assertEquals(factory.cookies,
|
| - {
|
| - 'CUSTOMER': 'WILE_E_COYOTE',
|
| - 'PART_NUMBER': 'ROCKET_LAUNCHER_0001',
|
| - 'SHIPPING': 'FEDEX',
|
| - })
|
| -
|
| -if ssl is None or not hasattr(ssl, 'DefaultOpenSSLContextFactory'):
|
| - for case in [WebClientSSLTestCase, WebClientRedirectBetweenSSLandPlainText]:
|
| - case.skip = "OpenSSL not present"
|
| -
|
| -if not interfaces.IReactorSSL(reactor, None):
|
| - for case in [WebClientSSLTestCase, WebClientRedirectBetweenSSLandPlainText]:
|
| - case.skip = "Reactor doesn't support SSL"
|
|
|